use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{BufReader, Read};
use std::path::Path;
use arrow::bitmap::MutableBitmap;
use memmap2::Mmap;
use polars::prelude::*;
use rayon::prelude::*;
use super::header::{self, YxdbHeader, HEADER_SIZE};
use super::lzf::{self, CompressionAlgorithm};
use super::record;
use super::record::FieldValue;
use crate::error::{Result, YxdbError};
use crate::field::{FieldMeta, FieldType};
pub struct YxdbReader {
stream: BufReader<File>,
pub header: YxdbHeader,
pub fields: Vec<FieldMeta>,
pub meta_xml: String,
pub(crate) fixed_size: usize,
has_var: bool,
compression: Option<CompressionAlgorithm>,
lzf_out: Vec<u8>,
lzf_out_idx: usize,
lzf_out_size: usize,
lzf_in: Vec<u8>, current_record: u64,
}
impl YxdbReader {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let file = File::open(path.as_ref())?;
let mut stream = BufReader::new(file);
let mut header_buf = [0u8; HEADER_SIZE];
match stream.read_exact(&mut header_buf) {
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Err(YxdbError::InvalidFile(
"file too small to be a valid YXDB (< 512 bytes)".into(),
));
}
Err(e) => return Err(e.into()),
Ok(_) => {}
}
let header = YxdbHeader::parse(&header_buf)?;
let meta_byte_len = header.meta_info_size as usize * 2;
let mut meta_bytes = vec![0u8; meta_byte_len];
stream.read_exact(&mut meta_bytes)?;
let xml_bytes = if meta_byte_len >= 2 {
&meta_bytes[..meta_byte_len - 2]
} else {
&meta_bytes
};
let meta_xml = header::decode_utf16_le(xml_bytes);
let fields = header::parse_meta_xml(&meta_xml)?;
let fixed_size: usize = fields
.last()
.map(|f| f.offset + f.field_type.fixed_bytes(f.size))
.unwrap_or(0);
const MAX_REASONABLE_RECORD_SIZE: usize = 64 * 1024 * 1024; if fixed_size > MAX_REASONABLE_RECORD_SIZE {
return Err(YxdbError::InvalidFile(format!(
"computed fixed record size ({fixed_size} bytes) exceeds sanity limit — \
the XML metadata is likely corrupt"
)));
}
for f in &fields {
let end = f.offset + f.field_type.fixed_bytes(f.size);
if end > fixed_size {
return Err(YxdbError::InvalidFile(format!(
"field '{}' (offset {} + {} bytes) exceeds computed fixed record size ({})",
f.name,
f.offset,
f.field_type.fixed_bytes(f.size),
fixed_size
)));
}
}
let has_var = fields.iter().any(|f| f.field_type.is_variable());
let compression = CompressionAlgorithm::from_version_id(header.compression_version)?;
Ok(YxdbReader {
stream,
header,
fields,
meta_xml,
fixed_size,
has_var,
compression,
lzf_out: vec![0u8; 262144],
lzf_out_idx: 0,
lzf_out_size: 0,
lzf_in: Vec::with_capacity(262144),
current_record: 0,
})
}
pub fn next_record(&mut self, buf: &mut Vec<u8>) -> Result<bool> {
self.current_record += 1;
if self.current_record > self.header.num_records {
return Ok(false);
}
if self.has_var {
let needed = self.fixed_size + 4;
buf.resize(needed, 0);
self.read_bytes(&mut buf[..needed])?;
let var_len = u32::from_le_bytes(
buf[self.fixed_size..self.fixed_size + 4]
.try_into()
.unwrap(),
) as usize;
let total = needed + var_len;
buf.resize(total, 0);
self.read_bytes(&mut buf[needed..total])?;
} else {
buf.resize(self.fixed_size, 0);
self.read_bytes(&mut buf[..self.fixed_size])?;
}
Ok(true)
}
pub fn into_dataframe(self) -> Result<DataFrame> {
self.into_dataframe_projected(None)
}
pub fn into_dataframe_projected(mut self, columns: Option<&[&str]>) -> Result<DataFrame> {
let num_records = self.header.num_records as usize;
let fields = std::mem::take(&mut self.fields);
if num_records == 0 {
let projected_fields: Vec<&FieldMeta> = match columns {
Some(names) => {
let field_map: std::collections::HashMap<&str, &FieldMeta> =
fields.iter().map(|f| (f.name.as_str(), f)).collect();
let unknown: Vec<&str> = names
.iter()
.copied()
.filter(|n| !field_map.contains_key(n))
.collect();
if !unknown.is_empty() {
return Err(YxdbError::InvalidFile(format!(
"requested columns not found in file: {:?}",
unknown
)));
}
names
.iter()
.filter_map(|n| field_map.get(n).copied())
.collect()
}
None => fields.iter().collect(),
};
let empty_cols: Vec<Column> = projected_fields
.iter()
.map(|f| {
let series = empty_series_for_field(f);
Column::from(series)
})
.collect();
return DataFrame::new(0, empty_cols)
.map_err(|e| YxdbError::ConversionError(e.to_string()));
}
let fixed_size = self.fixed_size;
let has_var = self.has_var;
let compression = self.compression;
let record_block_index_pos = self.header.record_block_index_pos;
let mmap = {
let inner_stream = self.stream;
let file = inner_stream.into_inner();
unsafe { Mmap::map(&file) }?
};
let data_offset = {
let meta_byte_len = self.header.meta_info_size as usize * 2;
HEADER_SIZE + meta_byte_len
};
let block_data_end = if record_block_index_pos > data_offset as i64 {
(record_block_index_pos as usize).min(mmap.len())
} else {
mmap.len()
};
let raw_data = &mmap[data_offset..block_data_end];
let all_data: Cow<'_, [u8]> = match compression {
None => {
Cow::Borrowed(raw_data)
}
Some(algo) => {
let mut data = decompress_blocks(raw_data, algo, None)?;
if self.header.has_spatial_index()
&& has_var
&& !records_fit(&data, fixed_size, num_records)
{
data = decompress_blocks(raw_data, algo, Some(fixed_size))?;
}
Cow::Owned(data)
}
};
let bounds = if has_var {
scan_variable_record_bounds(&all_data, fixed_size, num_records)
} else {
RecordBounds::Fixed { fixed_size }
};
let projected_fields: Vec<&FieldMeta> = match columns {
Some(names) => {
let name_set: HashSet<&str> = names.iter().copied().collect();
let field_map: HashMap<&str, &FieldMeta> = fields
.iter()
.filter(|f| name_set.contains(f.name.as_str()))
.map(|f| (f.name.as_str(), f))
.collect();
let unknown: Vec<&str> = names
.iter()
.copied()
.filter(|n| !field_map.contains_key(n))
.collect();
if !unknown.is_empty() {
return Err(YxdbError::InvalidFile(format!(
"requested columns not found in file: {:?}",
unknown
)));
}
names
.iter()
.filter_map(|n| field_map.get(n).copied())
.collect()
}
None => fields.iter().collect(),
};
const MIN_COLS_FOR_PAR: usize = 6;
const MIN_DATA_FOR_PAR: usize = 10 * 1024 * 1024;
let built_columns: Result<Vec<Column>> =
if projected_fields.len() >= MIN_COLS_FOR_PAR || all_data.len() >= MIN_DATA_FOR_PAR {
projected_fields
.par_iter()
.map(|field| build_column(field, &all_data, &bounds, num_records))
.collect()
} else {
projected_fields
.iter()
.map(|field| build_column(field, &all_data, &bounds, num_records))
.collect()
};
let cols = built_columns?;
let height = cols.first().map_or(0, |c| c.len());
DataFrame::new(height, cols).map_err(|e| YxdbError::ConversionError(e.to_string()))
}
pub fn next_batch(
&mut self,
batch_size: usize,
columns: Option<&[&str]>,
) -> Result<Option<DataFrame>> {
if self.current_record >= self.header.num_records {
return Ok(None);
}
let remaining = (self.header.num_records - self.current_record) as usize;
let this_batch = remaining.min(batch_size);
let projected_indices: Vec<usize> = match columns {
Some(names) => {
let field_map: HashMap<&str, usize> = self
.fields
.iter()
.enumerate()
.map(|(i, f)| (f.name.as_str(), i))
.collect();
let unknown: Vec<&str> = names
.iter()
.copied()
.filter(|n| !field_map.contains_key(n))
.collect();
if !unknown.is_empty() {
return Err(YxdbError::InvalidFile(format!(
"requested columns not found in file: {:?}",
unknown
)));
}
names
.iter()
.filter_map(|n| field_map.get(n).copied())
.collect()
}
None => (0..self.fields.len()).collect(),
};
let projected_fields: Vec<FieldMeta> = projected_indices
.iter()
.map(|&i| self.fields[i].clone())
.collect();
let mut builders: Vec<ColumnBuilder> = projected_fields
.iter()
.map(|f| ColumnBuilder::new(f, this_batch))
.collect();
let mut record_buf = Vec::with_capacity(self.fixed_size + 1024);
let mut count = 0;
while count < this_batch {
if !self.next_record(&mut record_buf)? {
break;
}
for (bi, field) in projected_fields.iter().enumerate() {
builders[bi].push_from_record(&record_buf, field)?;
}
count += 1;
}
if count == 0 {
return Ok(None);
}
let columns: Vec<Column> = builders
.into_iter()
.zip(projected_fields.iter().map(|f| &f.name))
.map(|(b, name)| b.into_series(name))
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(Column::from)
.collect();
let height = columns.first().map_or(0, |c| c.len());
let df = DataFrame::new(height, columns)
.map_err(|e| YxdbError::ConversionError(e.to_string()))?;
Ok(Some(df))
}
fn read_bytes(&mut self, dest: &mut [u8]) -> Result<()> {
if self.compression.is_none() {
self.stream.read_exact(dest)?;
return Ok(());
}
let mut remaining = dest.len();
let mut dest_idx = 0;
while remaining > 0 {
if self.lzf_out_idx >= self.lzf_out_size {
self.read_next_lzf_block()?;
}
let available = self.lzf_out_size - self.lzf_out_idx;
let to_copy = remaining.min(available);
dest[dest_idx..dest_idx + to_copy]
.copy_from_slice(&self.lzf_out[self.lzf_out_idx..self.lzf_out_idx + to_copy]);
self.lzf_out_idx += to_copy;
dest_idx += to_copy;
remaining -= to_copy;
}
Ok(())
}
fn read_next_lzf_block(&mut self) -> Result<()> {
let algo = self
.compression
.expect("read_next_lzf_block called with no compression");
loop {
let mut len_buf = [0u8; 4];
self.stream.read_exact(&mut len_buf)?;
let raw_len = u32::from_le_bytes(len_buf) as usize;
let is_uncompressed = raw_len & 0x80000000 != 0;
let block_len = raw_len & 0x7FFFFFFF;
if is_uncompressed {
if self.lzf_out.len() < block_len {
self.lzf_out.resize(block_len, 0);
}
self.stream.read_exact(&mut self.lzf_out[..block_len])?;
self.lzf_out_size = block_len;
} else {
self.lzf_in.resize(block_len, 0);
self.stream.read_exact(&mut self.lzf_in[..block_len])?;
let min_out = 262144usize.max(block_len * 10);
if self.lzf_out.len() < min_out {
self.lzf_out.resize(min_out, 0);
}
self.lzf_out_size =
lzf::decompress_block(algo, &self.lzf_in[..block_len], &mut self.lzf_out)?;
}
if self.header.has_spatial_index()
&& self.has_var
&& !is_record_block(&self.lzf_out[..self.lzf_out_size], self.fixed_size)
{
continue;
}
self.lzf_out_idx = 0;
return Ok(());
}
}
}
fn records_fit(data: &[u8], fixed_size: usize, num_records: usize) -> bool {
let mut offset = 0usize;
for _ in 0..num_records {
let var_start = offset + fixed_size;
if var_start + 4 > data.len() {
return false;
}
let var_len =
u32::from_le_bytes(data[var_start..var_start + 4].try_into().unwrap()) as usize;
offset = var_start + 4 + var_len;
if offset > data.len() {
return false;
}
}
true
}
fn scan_variable_record_bounds(data: &[u8], fixed_size: usize, num_records: usize) -> RecordBounds {
let mut ends = Vec::with_capacity(num_records + 1);
ends.push(0usize);
let mut offset = 0usize;
for _ in 0..num_records {
let var_start = offset + fixed_size;
if var_start + 4 > data.len() {
break;
}
let var_len =
u32::from_le_bytes(data[var_start..var_start + 4].try_into().unwrap()) as usize;
offset = var_start + 4 + var_len;
ends.push(offset);
}
RecordBounds::Variable { ends }
}
#[inline]
fn is_record_block(block_data: &[u8], fixed_size: usize) -> bool {
const PROBES: usize = 3;
let mut offset = 0usize;
for _ in 0..PROBES {
if offset + fixed_size + 4 > block_data.len() {
break;
}
let var_len = u32::from_le_bytes(
block_data[offset + fixed_size..offset + fixed_size + 4]
.try_into()
.unwrap(),
) as usize;
let record_end = offset + fixed_size + 4 + var_len;
if record_end > block_data.len() {
return false; }
offset = record_end;
}
true
}
fn decompress_blocks(
raw_data: &[u8],
algo: CompressionAlgorithm,
spatial_record_filter: Option<usize>,
) -> Result<Vec<u8>> {
let mut blocks: Vec<(usize, usize, bool)> = Vec::new(); let mut pos = 0usize;
while pos + 4 <= raw_data.len() {
let raw_len = u32::from_le_bytes(raw_data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let is_uncompressed = raw_len & 0x80000000 != 0;
let block_len = raw_len & 0x7FFFFFFF;
if pos + block_len > raw_data.len() {
break;
}
blocks.push((pos, block_len, is_uncompressed));
pos += block_len;
}
const BLOCK_SIZE: usize = 262144;
const MIN_BLOCKS_FOR_PAR: usize = 8;
let num_blocks = blocks.len();
let mut block_offsets: Vec<usize> = Vec::with_capacity(num_blocks + 1);
block_offsets.push(0);
for (idx, &(_offset, length, is_uncompressed)) in blocks.iter().enumerate() {
let expected_output = if is_uncompressed {
length
} else {
BLOCK_SIZE.max(length * 10)
};
block_offsets.push(block_offsets[idx] + expected_output);
}
let max_total = *block_offsets.last().unwrap_or(&0);
let mut all_data: Vec<u8> = vec![0u8; max_total.max(1)];
let all_data_ptr = all_data.as_mut_ptr();
struct SendSyncPtr(*mut u8);
unsafe impl Send for SendSyncPtr {}
unsafe impl Sync for SendSyncPtr {}
impl SendSyncPtr {
#[inline]
#[allow(clippy::mut_from_ref)]
unsafe fn slice_mut(&self, offset: usize, len: usize) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.0.add(offset), len) }
}
}
let ptr = SendSyncPtr(all_data_ptr);
let mut block_sizes: Vec<usize> = if num_blocks >= MIN_BLOCKS_FOR_PAR {
let results: Result<Vec<usize>> = blocks
.par_iter()
.enumerate()
.map(|(idx, &(offset, length, is_uncompressed))| {
let dest_start = block_offsets[idx];
let dest_capacity = block_offsets[idx + 1] - dest_start;
let dest = unsafe { ptr.slice_mut(dest_start, dest_capacity) };
if is_uncompressed {
dest[..length].copy_from_slice(&raw_data[offset..offset + length]);
Ok(length)
} else {
lzf::decompress_block_into(algo, &raw_data[offset..offset + length], dest)
}
})
.collect();
results?
} else {
let results: Result<Vec<usize>> = blocks
.iter()
.enumerate()
.map(|(idx, &(offset, length, is_uncompressed))| {
let dest_start = block_offsets[idx];
let dest_capacity = block_offsets[idx + 1] - dest_start;
let dest = unsafe { ptr.slice_mut(dest_start, dest_capacity) };
if is_uncompressed {
dest[..length].copy_from_slice(&raw_data[offset..offset + length]);
Ok(length)
} else {
lzf::decompress_block_into(algo, &raw_data[offset..offset + length], dest)
}
})
.collect();
results?
};
if let Some(fixed_size) = spatial_record_filter {
for idx in 0..block_sizes.len() {
let actual_size = block_sizes[idx];
if actual_size > 0 {
let start = block_offsets[idx];
if !is_record_block(&all_data[start..start + actual_size], fixed_size) {
block_sizes[idx] = 0;
}
}
}
}
let mut write_pos = 0usize;
for (idx, &actual_size) in block_sizes.iter().enumerate() {
let read_pos = block_offsets[idx];
if write_pos != read_pos && actual_size > 0 {
unsafe {
std::ptr::copy(
all_data.as_ptr().add(read_pos),
all_data.as_mut_ptr().add(write_pos),
actual_size,
);
}
}
write_pos += actual_size;
}
all_data.truncate(write_pos);
Ok(all_data)
}
enum RecordBounds {
Fixed { fixed_size: usize },
Variable { ends: Vec<usize> },
}
impl RecordBounds {
#[inline(always)]
fn record_slice<'a>(&self, data: &'a [u8], i: usize) -> Result<&'a [u8]> {
match self {
RecordBounds::Fixed { fixed_size } => {
let start = i * fixed_size;
let end = start + fixed_size;
if end > data.len() {
return Err(YxdbError::InvalidFile(format!(
"record {i} exceeds data bounds: offset {end} > data length {}",
data.len()
)));
}
Ok(&data[start..end])
}
RecordBounds::Variable { ends } => {
let start = ends[i];
let end = ends[i + 1];
if end > data.len() {
return Err(YxdbError::InvalidFile(format!(
"variable record {i} exceeds data bounds: offset {end} > data length {}",
data.len()
)));
}
Ok(&data[start..end])
}
}
}
#[inline(always)]
fn num_records(&self, total: usize) -> usize {
match self {
RecordBounds::Fixed { .. } => total,
RecordBounds::Variable { ends } => ends.len() - 1,
}
}
}
fn empty_series_for_field(field: &FieldMeta) -> Series {
use crate::field::FieldType;
let name = PlSmallStr::from(field.name.as_str());
match field.field_type {
FieldType::Bool => Series::new_empty(name, &DataType::Boolean),
FieldType::Byte | FieldType::Int16 => Series::new_empty(name, &DataType::Int16),
FieldType::Int32 => Series::new_empty(name, &DataType::Int32),
FieldType::Int64 => Series::new_empty(name, &DataType::Int64),
FieldType::Float => Series::new_empty(name, &DataType::Float32),
FieldType::Double => Series::new_empty(name, &DataType::Float64),
FieldType::FixedDecimal => {
let raw_p = if field.size > 1 {
field.size - 1 - if field.scale > 0 { 1 } else { 0 }
} else {
19 };
let p = raw_p.min(38); let s = field.scale;
Series::new_empty(name, &DataType::Decimal(p, s))
}
FieldType::String | FieldType::WString | FieldType::VString | FieldType::VWString => {
Series::new_empty(name, &DataType::String)
}
FieldType::Date => Series::new_empty(name, &DataType::Date),
FieldType::Time => Series::new_empty(name, &DataType::Time),
FieldType::DateTime => {
Series::new_empty(name, &DataType::Datetime(TimeUnit::Microseconds, None))
}
FieldType::Blob | FieldType::SpatialObj => Series::new_empty(name, &DataType::Binary),
}
}
fn build_column(
field: &FieldMeta,
all_data: &[u8],
bounds: &RecordBounds,
num_records: usize,
) -> Result<Column> {
let n = bounds.num_records(num_records);
let mut builder = ColumnBuilder::new(field, n);
for i in 0..n {
let record = bounds.record_slice(all_data, i)?;
builder.push_from_record(record, field)?;
}
Ok(Column::from(builder.into_series(&field.name)?))
}
fn parse_decimal_i128(s: &str, scale: usize) -> i128 {
let s = s.trim();
if s.is_empty() {
return 0;
}
let (neg, s) = if let Some(rest) = s.strip_prefix('-') {
(true, rest)
} else {
(false, s)
};
let (int_part, frac_part) = match s.find('.') {
Some(dot) => (&s[..dot], &s[dot + 1..]),
None => (s, ""),
};
let mut result: i128 = 0;
for &b in int_part.as_bytes() {
if b.is_ascii_digit() {
result = result * 10 + (b - b'0') as i128;
}
}
let frac_bytes = frac_part.as_bytes();
for i in 0..scale {
result *= 10;
if i < frac_bytes.len() {
let b = frac_bytes[i];
if b.is_ascii_digit() {
result += (b - b'0') as i128;
}
}
}
if neg {
-result
} else {
result
}
}
enum ColumnBuilder {
Bool {
values: Vec<bool>,
validity: MutableBitmap,
has_nulls: bool,
},
Byte {
values: Vec<i16>,
validity: MutableBitmap,
has_nulls: bool,
},
Int16 {
values: Vec<i16>,
validity: MutableBitmap,
has_nulls: bool,
},
Int32 {
values: Vec<i32>,
validity: MutableBitmap,
has_nulls: bool,
},
Int64 {
values: Vec<i64>,
validity: MutableBitmap,
has_nulls: bool,
},
Float {
values: Vec<f32>,
validity: MutableBitmap,
has_nulls: bool,
},
Double {
values: Vec<f64>,
validity: MutableBitmap,
has_nulls: bool,
},
Decimal {
values: Vec<i128>,
validity: MutableBitmap,
has_nulls: bool,
precision: usize,
scale: usize,
},
StrBuilder {
builder: StringChunkedBuilder,
str_buf: String,
},
DateDays {
values: Vec<i32>,
validity: MutableBitmap,
has_nulls: bool,
},
TimeNs {
values: Vec<i64>,
validity: MutableBitmap,
has_nulls: bool,
},
DateTimeUs {
values: Vec<i64>,
validity: MutableBitmap,
has_nulls: bool,
},
Blob(Vec<Option<Vec<u8>>>),
}
const UNIX_EPOCH_DAYS: i32 = 719_468;
#[inline]
fn parse_date_to_days(buf: &[u8]) -> Option<i32> {
if buf.len() < 10 {
return None;
}
let y = parse_4_digits(buf)? as i32;
let m = parse_2_digits(&buf[5..])? as u32;
let d = parse_2_digits(&buf[8..])? as u32;
Some(civil_to_days(y, m, d))
}
#[inline]
fn parse_time_to_ns(buf: &[u8]) -> Option<i64> {
if buf.len() < 8 {
return None;
}
let h = parse_2_digits(buf)? as i64;
let m = parse_2_digits(&buf[3..])? as i64;
let s = parse_2_digits(&buf[6..])? as i64;
Some((h * 3600 + m * 60 + s) * 1_000_000_000)
}
#[inline]
fn parse_datetime_to_us(buf: &[u8]) -> Option<i64> {
if buf.len() < 19 {
return None;
}
let days = parse_date_to_days(buf)? as i64;
let h = parse_2_digits(&buf[11..])? as i64;
let min = parse_2_digits(&buf[14..])? as i64;
let s = parse_2_digits(&buf[17..])? as i64;
Some(days * 86_400_000_000 + h * 3_600_000_000 + min * 60_000_000 + s * 1_000_000)
}
#[inline]
fn parse_2_digits(b: &[u8]) -> Option<u16> {
let d0 = b[0].wrapping_sub(b'0');
let d1 = b[1].wrapping_sub(b'0');
if d0 > 9 || d1 > 9 {
return None;
}
Some(d0 as u16 * 10 + d1 as u16)
}
#[inline]
fn parse_4_digits(b: &[u8]) -> Option<u16> {
let d0 = b[0].wrapping_sub(b'0') as u16;
let d1 = b[1].wrapping_sub(b'0') as u16;
let d2 = b[2].wrapping_sub(b'0') as u16;
let d3 = b[3].wrapping_sub(b'0') as u16;
if d0 > 9 || d1 > 9 || d2 > 9 || d3 > 9 {
return None;
}
Some(d0 * 1000 + d1 * 100 + d2 * 10 + d3)
}
#[inline]
fn civil_to_days(y: i32, m: u32, d: u32) -> i32 {
let y = if m <= 2 { y - 1 } else { y };
let era = if y >= 0 { y } else { y - 399 } / 400;
let yoe = (y - era * 400) as u32;
let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d - 1;
let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
(era * 146097 + doe as i32) - UNIX_EPOCH_DAYS
}
#[inline(always)]
unsafe fn read_bytes_unchecked<const N: usize>(buf: &[u8], off: usize) -> [u8; N] {
let mut out = [0u8; N];
unsafe { std::ptr::copy_nonoverlapping(buf.as_ptr().add(off), out.as_mut_ptr(), N) };
out
}
#[inline]
fn transcode_utf16le(bytes: &[u8], out: &mut String) {
out.clear();
let n = bytes.len() / 2;
out.reserve(n);
let v = unsafe { out.as_mut_vec() };
let mut i = 0;
#[cfg(target_arch = "x86_64")]
{
use std::arch::x86_64::*;
unsafe {
let zero = _mm_setzero_si128();
let hi_byte_mask = _mm_set1_epi16(0xFF00u16 as i16);
while i + 16 <= bytes.len() {
let chunk = _mm_loadu_si128(bytes.as_ptr().add(i) as *const __m128i);
let byte_mask = _mm_movemask_epi8(chunk);
let hi_bytes = _mm_and_si128(chunk, hi_byte_mask);
let hi_is_zero = _mm_cmpeq_epi8(hi_bytes, zero);
let hi_mask = _mm_movemask_epi8(hi_is_zero);
if byte_mask == 0 && hi_mask == 0xFFFF {
let packed = _mm_packus_epi16(chunk, zero);
v.reserve(8);
let len = v.len();
std::ptr::copy_nonoverlapping(
&packed as *const __m128i as *const u8,
v.as_mut_ptr().add(len),
8,
);
v.set_len(len + 8);
i += 16;
} else {
let lo = bytes[i];
let hi = bytes[i + 1];
let cu = u16::from_le_bytes([lo, hi]);
if (0xD800..=0xDFFF).contains(&cu) {
break; }
if hi == 0 && lo < 0x80 {
v.push(lo);
} else if cu < 0x800 {
v.push(0xC0 | ((cu >> 6) as u8));
v.push(0x80 | ((cu & 0x3F) as u8));
} else {
v.push(0xE0 | ((cu >> 12) as u8));
v.push(0x80 | (((cu >> 6) & 0x3F) as u8));
v.push(0x80 | ((cu & 0x3F) as u8));
}
i += 2;
}
}
}
}
while i + 1 < bytes.len() {
let lo = bytes[i];
let hi = bytes[i + 1];
if hi == 0 {
if lo < 0x80 {
v.push(lo);
} else {
v.push(0xC0 | (lo >> 6));
v.push(0x80 | (lo & 0x3F));
}
} else {
let cu = u16::from_le_bytes([lo, hi]);
if cu < 0x800 {
v.push(0xC0 | ((cu >> 6) as u8));
v.push(0x80 | ((cu & 0x3F) as u8));
} else if (0xD800..=0xDBFF).contains(&cu) {
i += 2;
if i + 1 < bytes.len() {
let cu2 = u16::from_le_bytes([bytes[i], bytes[i + 1]]);
if (0xDC00..=0xDFFF).contains(&cu2) {
let cp = 0x10000 + ((cu as u32 - 0xD800) << 10) + (cu2 as u32 - 0xDC00);
v.push(0xF0 | ((cp >> 18) as u8));
v.push(0x80 | (((cp >> 12) & 0x3F) as u8));
v.push(0x80 | (((cp >> 6) & 0x3F) as u8));
v.push(0x80 | ((cp & 0x3F) as u8));
} else {
v.extend_from_slice(&[0xEF, 0xBF, 0xBD]);
}
} else {
v.extend_from_slice(&[0xEF, 0xBF, 0xBD]);
}
} else if (0xDC00..=0xDFFF).contains(&cu) {
v.extend_from_slice(&[0xEF, 0xBF, 0xBD]);
} else {
v.push(0xE0 | ((cu >> 12) as u8));
v.push(0x80 | (((cu >> 6) & 0x3F) as u8));
v.push(0x80 | ((cu & 0x3F) as u8));
}
}
i += 2;
}
}
impl ColumnBuilder {
fn new(field: &FieldMeta, capacity: usize) -> Self {
match field.field_type {
FieldType::Bool => ColumnBuilder::Bool {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
},
FieldType::Byte => ColumnBuilder::Byte {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
},
FieldType::Int16 => ColumnBuilder::Int16 {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
},
FieldType::Int32 => ColumnBuilder::Int32 {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
},
FieldType::Int64 => ColumnBuilder::Int64 {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
},
FieldType::Float => ColumnBuilder::Float {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
},
FieldType::Double => ColumnBuilder::Double {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
},
FieldType::FixedDecimal => ColumnBuilder::Decimal {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
precision: {
let raw_p = if field.size > 1 {
field.size - 1 - if field.scale > 0 { 1 } else { 0 }
} else {
19
};
raw_p.min(38)
},
scale: field.scale,
},
FieldType::String | FieldType::WString | FieldType::VString | FieldType::VWString => {
ColumnBuilder::StrBuilder {
builder: StringChunkedBuilder::new(
PlSmallStr::from(field.name.as_str()),
capacity,
),
str_buf: String::with_capacity(64),
}
}
FieldType::Date => ColumnBuilder::DateDays {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
},
FieldType::Time => ColumnBuilder::TimeNs {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
},
FieldType::DateTime => ColumnBuilder::DateTimeUs {
values: Vec::with_capacity(capacity),
validity: MutableBitmap::with_capacity(capacity),
has_nulls: false,
},
FieldType::Blob | FieldType::SpatialObj => {
ColumnBuilder::Blob(Vec::with_capacity(capacity))
}
}
}
#[inline]
#[allow(clippy::undocumented_unsafe_blocks)]
fn push_from_record(&mut self, record: &[u8], field: &FieldMeta) -> Result<()> {
let off = field.offset;
match self {
ColumnBuilder::Bool {
values,
validity,
has_nulls,
} => {
let b = unsafe { *record.get_unchecked(off) };
if b == 2 {
values.push(false);
validity.push(false);
*has_nulls = true;
} else {
values.push(b == 1);
validity.push(true);
}
}
ColumnBuilder::Byte {
values,
validity,
has_nulls,
} => unsafe {
if *record.get_unchecked(off + 1) == 1 {
values.push(0);
validity.push(false);
*has_nulls = true;
} else {
values.push(*record.get_unchecked(off) as i16);
validity.push(true);
}
},
ColumnBuilder::Int16 {
values,
validity,
has_nulls,
} => unsafe {
if *record.get_unchecked(off + 2) == 1 {
values.push(0);
validity.push(false);
*has_nulls = true;
} else {
values.push(i16::from_le_bytes(read_bytes_unchecked::<2>(record, off)));
validity.push(true);
}
},
ColumnBuilder::Int32 {
values,
validity,
has_nulls,
} => unsafe {
if *record.get_unchecked(off + 4) == 1 {
values.push(0);
validity.push(false);
*has_nulls = true;
} else {
values.push(i32::from_le_bytes(read_bytes_unchecked::<4>(record, off)));
validity.push(true);
}
},
ColumnBuilder::Int64 {
values,
validity,
has_nulls,
} => unsafe {
if *record.get_unchecked(off + 8) == 1 {
values.push(0);
validity.push(false);
*has_nulls = true;
} else {
values.push(i64::from_le_bytes(read_bytes_unchecked::<8>(record, off)));
validity.push(true);
}
},
ColumnBuilder::Float {
values,
validity,
has_nulls,
} => unsafe {
if *record.get_unchecked(off + 4) == 1 {
values.push(0.0);
validity.push(false);
*has_nulls = true;
} else {
values.push(f32::from_le_bytes(read_bytes_unchecked::<4>(record, off)));
validity.push(true);
}
},
ColumnBuilder::Double {
values,
validity,
has_nulls,
} => unsafe {
if *record.get_unchecked(off + 8) == 1 {
values.push(0.0);
validity.push(false);
*has_nulls = true;
} else {
values.push(f64::from_le_bytes(read_bytes_unchecked::<8>(record, off)));
validity.push(true);
}
},
ColumnBuilder::Decimal {
values,
validity,
has_nulls,
scale,
..
} => unsafe {
if *record.get_unchecked(off + field.size) == 1 {
values.push(0);
validity.push(false);
*has_nulls = true;
} else {
let slice = &record[off..off + field.size];
let len = slice.iter().position(|&b| b == 0).unwrap_or(field.size);
let s = std::str::from_utf8(&slice[..len]).unwrap_or("0");
values.push(parse_decimal_i128(s, *scale));
validity.push(true);
}
},
ColumnBuilder::StrBuilder { builder, str_buf } => match field.field_type {
FieldType::String => unsafe {
if *record.get_unchecked(off + field.size) == 1 {
builder.append_null();
} else {
let slice = &record[off..off + field.size];
let len = slice.iter().position(|&b| b == 0).unwrap_or(field.size);
match std::str::from_utf8(&slice[..len]) {
Ok(s) => builder.append_value(s),
Err(_) => {
let cow = String::from_utf8_lossy(&slice[..len]);
builder.append_value(cow.as_ref());
}
}
}
},
FieldType::WString => {
let null_byte_off = off + field.size * 2;
unsafe {
if *record.get_unchecked(null_byte_off) == 1 {
builder.append_null();
} else {
let byte_len = field.size * 2;
let slice = &record[off..off + byte_len];
let char_count = slice
.chunks_exact(2)
.position(|c| c[0] == 0 && c[1] == 0)
.unwrap_or(field.size);
transcode_utf16le(&slice[..char_count * 2], str_buf);
builder.append_value(str_buf.as_str());
}
}
}
FieldType::VString => match record::locate_var_data(record, off) {
None => builder.append_null(),
Some([]) => builder.append_value(""),
Some(bytes) => match std::str::from_utf8(bytes) {
Ok(s) => builder.append_value(s),
Err(_) => {
let cow = String::from_utf8_lossy(bytes);
builder.append_value(cow.as_ref());
}
},
},
FieldType::VWString => match record::locate_var_data(record, off) {
None => builder.append_null(),
Some([]) => builder.append_value(""),
Some(bytes) => {
transcode_utf16le(bytes, str_buf);
builder.append_value(str_buf.as_str());
}
},
_ => unreachable!(),
},
ColumnBuilder::DateDays {
values,
validity,
has_nulls,
} => unsafe {
if *record.get_unchecked(off + 10) == 1 {
values.push(0);
validity.push(false);
*has_nulls = true;
} else {
match parse_date_to_days(&record[off..off + 10]) {
Some(v) => {
values.push(v);
validity.push(true);
}
None => {
values.push(0);
validity.push(false);
*has_nulls = true;
}
}
}
},
ColumnBuilder::TimeNs {
values,
validity,
has_nulls,
} => unsafe {
if *record.get_unchecked(off + 8) == 1 {
values.push(0);
validity.push(false);
*has_nulls = true;
} else {
match parse_time_to_ns(&record[off..off + 8]) {
Some(v) => {
values.push(v);
validity.push(true);
}
None => {
values.push(0);
validity.push(false);
*has_nulls = true;
}
}
}
},
ColumnBuilder::DateTimeUs {
values,
validity,
has_nulls,
} => unsafe {
if *record.get_unchecked(off + 19) == 1 {
values.push(0);
validity.push(false);
*has_nulls = true;
} else {
match parse_datetime_to_us(&record[off..off + 19]) {
Some(v) => {
values.push(v);
validity.push(true);
}
None => {
values.push(0);
validity.push(false);
*has_nulls = true;
}
}
}
},
ColumnBuilder::Blob(v) => {
v.push(record::parse_var_data(record, off));
}
}
Ok(())
}
fn into_series(self, name: &str) -> Result<Series> {
let s = match self {
ColumnBuilder::Bool {
values,
validity,
has_nulls,
} => {
if has_nulls {
let bitmap: arrow::bitmap::Bitmap = validity.into();
let values_bitmap = arrow::bitmap::Bitmap::from_iter(values.iter().copied());
let arr =
arrow::array::BooleanArray::from_data_default(values_bitmap, Some(bitmap));
let ca = BooleanChunked::with_chunk(name.into(), arr);
ca.into_series()
} else {
BooleanChunked::new(name.into(), &values).into_series()
}
}
ColumnBuilder::Byte {
values,
validity,
has_nulls,
} => {
if has_nulls {
Int16Chunked::from_vec_validity(name.into(), values, Some(validity.into()))
.into_series()
} else {
Int16Chunked::from_vec(name.into(), values).into_series()
}
}
ColumnBuilder::Int16 {
values,
validity,
has_nulls,
} => {
if has_nulls {
Int16Chunked::from_vec_validity(name.into(), values, Some(validity.into()))
.into_series()
} else {
Int16Chunked::from_vec(name.into(), values).into_series()
}
}
ColumnBuilder::Int32 {
values,
validity,
has_nulls,
} => {
if has_nulls {
Int32Chunked::from_vec_validity(name.into(), values, Some(validity.into()))
.into_series()
} else {
Int32Chunked::from_vec(name.into(), values).into_series()
}
}
ColumnBuilder::Int64 {
values,
validity,
has_nulls,
} => {
if has_nulls {
Int64Chunked::from_vec_validity(name.into(), values, Some(validity.into()))
.into_series()
} else {
Int64Chunked::from_vec(name.into(), values).into_series()
}
}
ColumnBuilder::Float {
values,
validity,
has_nulls,
} => {
if has_nulls {
Float32Chunked::from_vec_validity(name.into(), values, Some(validity.into()))
.into_series()
} else {
Float32Chunked::from_vec(name.into(), values).into_series()
}
}
ColumnBuilder::Double {
values,
validity,
has_nulls,
} => {
if has_nulls {
Float64Chunked::from_vec_validity(name.into(), values, Some(validity.into()))
.into_series()
} else {
Float64Chunked::from_vec(name.into(), values).into_series()
}
}
ColumnBuilder::Decimal {
values,
validity,
has_nulls,
precision,
scale,
} => {
let ca = if has_nulls {
Int128Chunked::from_vec_validity(name.into(), values, Some(validity.into()))
} else {
Int128Chunked::from_vec(name.into(), values)
};
ca.into_decimal_unchecked(precision, scale).into_series()
}
ColumnBuilder::StrBuilder { builder, .. } => builder.finish().into_series(),
ColumnBuilder::DateDays {
values,
validity,
has_nulls,
} => {
let ca = if has_nulls {
Int32Chunked::from_vec_validity(name.into(), values, Some(validity.into()))
} else {
Int32Chunked::from_vec(name.into(), values)
};
ca.into_date().into_series()
}
ColumnBuilder::TimeNs {
values,
validity,
has_nulls,
} => {
let ca = if has_nulls {
Int64Chunked::from_vec_validity(name.into(), values, Some(validity.into()))
} else {
Int64Chunked::from_vec(name.into(), values)
};
ca.into_time().into_series()
}
ColumnBuilder::DateTimeUs {
values,
validity,
has_nulls,
} => {
let ca = if has_nulls {
Int64Chunked::from_vec_validity(name.into(), values, Some(validity.into()))
} else {
Int64Chunked::from_vec(name.into(), values)
};
ca.into_datetime(TimeUnit::Microseconds, None).into_series()
}
ColumnBuilder::Blob(v) => {
let values: Vec<Option<&[u8]>> = v.iter().map(|opt| opt.as_deref()).collect();
Series::new(name.into(), values)
}
};
Ok(s)
}
}
pub struct YxdbRowReader {
inner: YxdbReader,
record_buf: Vec<u8>,
name_map: HashMap<String, usize>,
has_current: bool,
}
impl YxdbRowReader {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let inner = YxdbReader::open(path)?;
let name_map: HashMap<String, usize> = inner
.fields
.iter()
.enumerate()
.map(|(i, f)| (f.name.clone(), i))
.collect();
let capacity = inner.fixed_size + 1024;
Ok(YxdbRowReader {
inner,
record_buf: Vec::with_capacity(capacity),
name_map,
has_current: false,
})
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Result<bool> {
self.has_current = self.inner.next_record(&mut self.record_buf)?;
Ok(self.has_current)
}
pub fn read_index(&self, index: usize) -> Result<FieldValue> {
if !self.has_current {
return Err(YxdbError::ConversionError(
"no current record -- call next() first".into(),
));
}
record::extract_field_index(&self.record_buf, &self.inner.fields, index)
}
pub fn read_name(&self, name: &str) -> Result<FieldValue> {
let index = self
.name_map
.get(name)
.ok_or_else(|| YxdbError::ConversionError(format!("unknown field name: {}", name)))?;
self.read_index(*index)
}
pub fn read_all(&self) -> Result<Vec<FieldValue>> {
if !self.has_current {
return Err(YxdbError::ConversionError(
"no current record -- call next() first".into(),
));
}
record::extract_all_fields(&self.record_buf, &self.inner.fields)
}
pub fn num_records(&self) -> u64 {
self.inner.header.num_records
}
pub fn fields(&self) -> &[FieldMeta] {
&self.inner.fields
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_df(columns: Vec<Column>) -> DataFrame {
let h = columns.first().map_or(0, |c| c.len());
DataFrame::new(h, columns).unwrap()
}
fn test_path(name: &str) -> String {
format!("{}/test_files/{}", env!("CARGO_MANIFEST_DIR"), name)
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn all_types_shape() {
let df =
crate::read_yxdb(test_path("AllTypes.yxdb"), crate::SpatialMode::Raw, false).unwrap();
assert_eq!(df.height(), 2);
assert_eq!(df.width(), 16);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn all_types_integer_values() {
let df =
crate::read_yxdb(test_path("AllTypes.yxdb"), crate::SpatialMode::Raw, false).unwrap();
let byte_col = df.column("ByteCol").unwrap().i16().unwrap();
assert_eq!(byte_col.get(0), Some(7));
assert_eq!(byte_col.get(1), Some(255));
let i16_col = df.column("Int16Col").unwrap().i16().unwrap();
assert_eq!(i16_col.get(0), Some(-1234));
assert_eq!(i16_col.get(1), Some(32767));
let i32_col = df.column("Int32Col").unwrap().i32().unwrap();
assert_eq!(i32_col.get(0), Some(42000));
assert_eq!(i32_col.get(1), Some(-1));
let i64_col = df.column("Int64Col").unwrap().i64().unwrap();
assert_eq!(i64_col.get(0), Some(9_000_000_000));
assert_eq!(i64_col.get(1), Some(-9_000_000_000));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn all_types_bool_values() {
let df =
crate::read_yxdb(test_path("AllTypes.yxdb"), crate::SpatialMode::Raw, false).unwrap();
let col = df.column("BoolCol").unwrap().bool().unwrap();
assert_eq!(col.get(0), Some(true));
assert_eq!(col.get(1), Some(false));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn all_types_float_values() {
let df =
crate::read_yxdb(test_path("AllTypes.yxdb"), crate::SpatialMode::Raw, false).unwrap();
let f32_col = df.column("FloatCol").unwrap().f32().unwrap();
assert!((f32_col.get(0).unwrap() - 2.5).abs() < 0.01);
let f64_col = df.column("DoubleCol").unwrap().f64().unwrap();
assert!((f64_col.get(0).unwrap() - std::f64::consts::PI).abs() < 1e-10);
assert!((f64_col.get(1).unwrap() - 0.0).abs() < 1e-10);
let dec_col = df.column("DecimalCol").unwrap();
assert!(matches!(dec_col.dtype(), DataType::Decimal(_, _)));
let dec_ca = dec_col.decimal().unwrap();
assert_eq!(dec_ca.phys.get(0), Some(12345678i128));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn all_types_string_values() {
let df =
crate::read_yxdb(test_path("AllTypes.yxdb"), crate::SpatialMode::Raw, false).unwrap();
let str_col = df.column("StringCol").unwrap().str().unwrap();
assert_eq!(str_col.get(0), Some("Alteryx"));
let wstr_col = df.column("WStringCol").unwrap().str().unwrap();
assert_eq!(wstr_col.get(0), Some("Ünïcödé"));
let vstr_col = df.column("VStringCol").unwrap().str().unwrap();
assert_eq!(vstr_col.get(0), Some("short var"));
let vwstr_col = df.column("VWStringCol").unwrap().str().unwrap();
let row0 = vwstr_col.get(0).unwrap();
assert_eq!(row0.len(), 600);
assert!(row0.chars().all(|c| c == 'x'));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn all_types_date_time_values() {
let df =
crate::read_yxdb(test_path("AllTypes.yxdb"), crate::SpatialMode::Raw, false).unwrap();
let date_col = df.column("DateCol").unwrap().date().unwrap();
let expected_date = chrono_date_to_days(2025, 3, 15);
assert_eq!(date_col.phys.get(0), Some(expected_date));
let dt_col = df.column("DateTimeCol").unwrap().datetime().unwrap();
let expected_dt = chrono_date_to_days(2025, 3, 15) as i64 * 86_400_000_000
+ 8 * 3_600_000_000
+ 30 * 60_000_000;
assert_eq!(dt_col.phys.get(0), Some(expected_dt));
let time_col = df.column("TimeCol").unwrap();
assert_eq!(time_col.dtype(), &DataType::Time);
assert!(!time_col.is_null().any());
}
fn chrono_date_to_days(y: i32, m: u32, d: u32) -> i32 {
let y = if m <= 2 { y - 1 } else { y };
let era = if y >= 0 { y } else { y - 399 } / 400;
let yoe = (y - era * 400) as u32;
let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d - 1;
let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
(era * 146097 + doe as i32) - 719_468
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn all_types_blob_values() {
let df =
crate::read_yxdb(test_path("AllTypes.yxdb"), crate::SpatialMode::Raw, false).unwrap();
let col = df.column("BlobCol").unwrap().binary().unwrap();
let blob0 = col.get(0).unwrap();
assert_eq!(blob0.len(), 1024);
assert_eq!(blob0[0], 0x00);
assert_eq!(blob0[1], 0x01);
assert_eq!(blob0[255], 0xFF);
assert_eq!(blob0[256], 0x00);
let blob1 = col.get(1).unwrap();
assert_eq!(blob1.len(), 512);
assert!(blob1.iter().all(|&b| b == 0xFF));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn null_values_populated_row() {
let df =
crate::read_yxdb(test_path("NullValues.yxdb"), crate::SpatialMode::Raw, false).unwrap();
assert_eq!(df.height(), 3);
let id_col = df.column("Id").unwrap().i32().unwrap();
assert_eq!(id_col.get(0), Some(1));
let str_col = df.column("NullStr").unwrap().str().unwrap();
assert_eq!(str_col.get(0), Some("hello"));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn null_values_all_null_row() {
let df =
crate::read_yxdb(test_path("NullValues.yxdb"), crate::SpatialMode::Raw, false).unwrap();
let id_col = df.column("Id").unwrap().i32().unwrap();
assert_eq!(id_col.get(1), Some(2));
assert!(df
.column("NullByte")
.unwrap()
.i16()
.unwrap()
.get(1)
.is_none());
assert!(df
.column("NullInt16")
.unwrap()
.i16()
.unwrap()
.get(1)
.is_none());
assert!(df
.column("NullInt32")
.unwrap()
.i32()
.unwrap()
.get(1)
.is_none());
assert!(df
.column("NullInt64")
.unwrap()
.i64()
.unwrap()
.get(1)
.is_none());
assert!(df
.column("NullFloat")
.unwrap()
.f32()
.unwrap()
.get(1)
.is_none());
assert!(df
.column("NullDouble")
.unwrap()
.f64()
.unwrap()
.get(1)
.is_none());
assert!(df
.column("NullStr")
.unwrap()
.str()
.unwrap()
.get(1)
.is_none());
assert!(df
.column("NullBlob")
.unwrap()
.binary()
.unwrap()
.get(1)
.is_none());
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn null_values_mixed_row() {
let df =
crate::read_yxdb(test_path("NullValues.yxdb"), crate::SpatialMode::Raw, false).unwrap();
assert!(df
.column("NullByte")
.unwrap()
.i16()
.unwrap()
.get(2)
.is_none());
let i16_col = df.column("NullInt16").unwrap().i16().unwrap();
assert_eq!(i16_col.get(2), Some(50));
assert!(df
.column("NullInt32")
.unwrap()
.i32()
.unwrap()
.get(2)
.is_none());
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn many_records_shape() {
let df = crate::read_yxdb(
test_path("ManyRecords.yxdb"),
crate::SpatialMode::Raw,
false,
)
.unwrap();
assert_eq!(df.height(), 50_000);
assert_eq!(df.width(), 3);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn many_records_id_sum() {
let df = crate::read_yxdb(
test_path("ManyRecords.yxdb"),
crate::SpatialMode::Raw,
false,
)
.unwrap();
let id_col = df.column("Id").unwrap().i32().unwrap();
let id_sum: i64 = id_col.into_iter().map(|v| v.unwrap_or(0) as i64).sum();
assert_eq!(id_sum, 1_250_025_000);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn many_records_label_check() {
let df = crate::read_yxdb(
test_path("ManyRecords.yxdb"),
crate::SpatialMode::Raw,
false,
)
.unwrap();
let label_col = df.column("Label").unwrap().str().unwrap();
assert_eq!(label_col.get(0), Some("row_00001"));
assert_eq!(label_col.get(49_999), Some("row_50000"));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn large_blob_sizes() {
let df =
crate::read_yxdb(test_path("LargeBlob.yxdb"), crate::SpatialMode::Raw, false).unwrap();
assert_eq!(df.height(), 4);
let col = df.column("Data").unwrap().binary().unwrap();
assert_eq!(col.get(0).unwrap().len(), 512_000);
assert!(col.get(1).is_none());
assert_eq!(col.get(2).unwrap(), b"tiny");
assert_eq!(col.get(3).unwrap().len(), 500_000);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn people_shape_and_columns() {
let df =
crate::read_yxdb(test_path("People.yxdb"), crate::SpatialMode::Raw, false).unwrap();
assert_eq!(df.height(), 200);
assert_eq!(df.width(), 8);
assert!(df
.get_column_names()
.iter()
.any(|n| n.as_str() == "FirstName"));
assert!(df.get_column_names().iter().any(|n| n.as_str() == "Salary"));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn people_no_null_ids() {
let df =
crate::read_yxdb(test_path("People.yxdb"), crate::SpatialMode::Raw, false).unwrap();
assert_eq!(df.column("PersonId").unwrap().null_count(), 0);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn strings_edge_cases() {
let df =
crate::read_yxdb(test_path("Strings.yxdb"), crate::SpatialMode::Raw, false).unwrap();
assert_eq!(df.height(), 6);
let vstr = df.column("VarStr").unwrap().str().unwrap();
assert_eq!(vstr.get(0), Some("variable"));
assert_eq!(vstr.get(1), Some(""));
let long_str = vstr.get(3).unwrap();
assert_eq!(long_str.len(), 2000);
assert!(long_str.chars().all(|c| c == 'M'));
assert!(vstr.get(5).is_none());
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn strings_unicode() {
let df =
crate::read_yxdb(test_path("Strings.yxdb"), crate::SpatialMode::Raw, false).unwrap();
let vwstr = df.column("VarWStr").unwrap().str().unwrap();
assert_eq!(vwstr.get(0), Some("wïdé"));
assert_eq!(vwstr.get(4), Some("日本語テスト"));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn single_column_values() {
let df = crate::read_yxdb(
test_path("SingleColumn.yxdb"),
crate::SpatialMode::Raw,
false,
)
.unwrap();
assert_eq!(df.height(), 5);
let col = df.column("Value").unwrap().i32().unwrap();
assert_eq!(col.get(0), Some(10));
assert_eq!(col.get(4), Some(50));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn projection_subset() {
let reader = YxdbReader::open(test_path("AllTypes.yxdb")).unwrap();
let df = reader
.into_dataframe_projected(Some(&["BoolCol", "Int32Col"]))
.unwrap();
assert_eq!(df.width(), 2);
assert_eq!(df.height(), 2);
assert!(df.column("BoolCol").is_ok());
assert!(df.column("Int32Col").is_ok());
assert!(df.column("StringCol").is_err());
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn projection_none_returns_all() {
let reader = YxdbReader::open(test_path("AllTypes.yxdb")).unwrap();
let df = reader.into_dataframe_projected(None).unwrap();
assert_eq!(df.width(), 16);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn projection_rejects_unknown_columns() {
let reader = YxdbReader::open(test_path("AllTypes.yxdb")).unwrap();
let result = reader.into_dataframe_projected(Some(&["Int32Col", "NoSuchColumn"]));
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("NoSuchColumn"),
"Error should mention the unknown column name"
);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn projection_variable_records() {
let reader = YxdbReader::open(test_path("Strings.yxdb")).unwrap();
let df = reader.into_dataframe_projected(Some(&["VarStr"])).unwrap();
assert_eq!(df.width(), 1);
assert_eq!(df.height(), 6);
let vstr = df.column("VarStr").unwrap().str().unwrap();
assert_eq!(vstr.get(0), Some("variable"));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn read_yxdb_columns_convenience() {
let df = crate::read_yxdb_columns(
test_path("People.yxdb"),
&["PersonId", "FirstName"],
crate::SpatialMode::Raw,
false,
)
.unwrap();
assert_eq!(df.width(), 2);
assert_eq!(df.height(), 200);
}
#[test]
fn reject_invalid_text_file() {
let result = YxdbReader::open(test_path("not_a_yxdb.txt"));
assert!(result.is_err());
}
#[test]
fn reject_too_small_file() {
let result = YxdbReader::open(test_path("too_small.bin"));
assert!(result.is_err());
}
#[test]
fn reject_nonexistent_file() {
let result = YxdbReader::open(test_path("does_not_exist.yxdb"));
assert!(result.is_err());
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn row_reader_all_types() {
let mut reader = YxdbRowReader::open(test_path("AllTypes.yxdb")).unwrap();
assert_eq!(reader.num_records(), 2);
assert_eq!(reader.fields().len(), 16);
assert!(reader.next().unwrap());
let vals = reader.read_all().unwrap();
assert_eq!(vals.len(), 16);
assert_eq!(
reader.read_name("BoolCol").unwrap(),
FieldValue::Bool(Some(true))
);
assert_eq!(
reader.read_name("ByteCol").unwrap(),
FieldValue::Byte(Some(7))
);
assert_eq!(
reader.read_name("Int16Col").unwrap(),
FieldValue::Int16(Some(-1234))
);
assert_eq!(
reader.read_name("Int32Col").unwrap(),
FieldValue::Int32(Some(42000))
);
assert_eq!(
reader.read_name("Int64Col").unwrap(),
FieldValue::Int64(Some(9_000_000_000))
);
let first_by_name = reader.read_name(reader.fields()[0].name.as_str()).unwrap();
let first_by_index = reader.read_index(0).unwrap();
assert_eq!(first_by_name, first_by_index);
assert!(reader.next().unwrap());
assert_eq!(
reader.read_name("BoolCol").unwrap(),
FieldValue::Bool(Some(false))
);
assert_eq!(
reader.read_name("ByteCol").unwrap(),
FieldValue::Byte(Some(255))
);
assert!(!reader.next().unwrap());
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn row_reader_name_lookup() {
let mut reader = YxdbRowReader::open(test_path("AllTypes.yxdb")).unwrap();
assert!(reader.next().unwrap());
let by_name = reader.read_name("Int32Col").unwrap();
let by_index = reader.read_index(3).unwrap();
assert_eq!(by_name, by_index);
assert_eq!(by_name, FieldValue::Int32(Some(42000)));
assert!(reader.read_name("NonExistent").is_err());
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn row_reader_null_handling() {
let mut reader = YxdbRowReader::open(test_path("NullValues.yxdb")).unwrap();
assert_eq!(reader.num_records(), 3);
assert!(reader.next().unwrap());
let row0_id = reader.read_name("Id").unwrap();
assert_eq!(row0_id, FieldValue::Int32(Some(1)));
assert!(reader.next().unwrap());
let row1_id = reader.read_name("Id").unwrap();
assert_eq!(row1_id, FieldValue::Int32(Some(2)));
let row1_str = reader.read_name("NullStr").unwrap();
assert_eq!(row1_str, FieldValue::String(None));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn row_reader_many_records() {
let mut reader = YxdbRowReader::open(test_path("ManyRecords.yxdb")).unwrap();
assert_eq!(reader.num_records(), 50_000);
let mut count = 0u64;
while reader.next().unwrap() {
count += 1;
}
assert_eq!(count, 50_000);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn row_reader_error_before_next() {
let reader = YxdbRowReader::open(test_path("AllTypes.yxdb")).unwrap();
assert!(reader.read_index(0).is_err());
assert!(reader.read_all().is_err());
assert!(reader.read_name("BoolCol").is_err());
}
#[test]
fn transcode_pure_ascii() {
let input: Vec<u8> = "Hello, World!"
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, "Hello, World!");
}
#[test]
fn transcode_latin_extended_u0100_range() {
let text = "ĀĂĄĆĈĊČ";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_eight_consecutive_u0100() {
let text = "ĀĀĀĀĀĀĀĀ";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
assert_eq!(input.len(), 16); let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_mixed_ascii_and_u0100_range() {
let text = "AĀBĂCĄDĆEĈFĊGČHā";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_cyrillic() {
let text = "Привет мир";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_greek() {
let text = "αβγδεζηθ";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_cjk() {
let text = "日本語テスト";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_surrogate_pairs() {
let text = "A😀B";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_empty() {
let mut out = String::new();
transcode_utf16le(&[], &mut out);
assert_eq!(out, "");
}
#[test]
fn transcode_single_char() {
let input = 0x0041u16.to_le_bytes(); let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, "A");
}
#[test]
fn transcode_long_ascii_then_nonascii() {
let text = "0123456789ABCDEFÜnïcödé";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_reuse_buffer() {
let mut out = String::new();
let input1: Vec<u8> = "Hello"
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
transcode_utf16le(&input1, &mut out);
assert_eq!(out, "Hello");
let input2: Vec<u8> = "World"
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
transcode_utf16le(&input2, &mut out);
assert_eq!(out, "World");
}
#[test]
fn transcode_exactly_8_ascii() {
let text = "ABCDEFGH";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
assert_eq!(input.len(), 16);
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_exactly_16_ascii() {
let text = "0123456789ABCDEF";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
assert_eq!(input.len(), 32);
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_7_ascii_then_nonascii() {
let text = "1234567Ü";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_9_ascii_then_nonascii() {
let text = "123456789é";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_bmp_boundary() {
let text = "\u{FFFD}A\u{FFFD}";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_multi_surrogate_pairs() {
let text = "𝄞𝄞𝄞𝄞"; let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_null_codepoints() {
let input: Vec<u8> = vec![
0x41, 0x00, 0x00, 0x00, 0x42, 0x00, ];
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, "A\0B");
}
#[test]
fn transcode_latin_ext_b() {
let text = "ƀƁƂƃƄƅƆƇ";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_arabic() {
let text = "مرحبا بالعالم";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_thai() {
let text = "สวัสดีครับ";
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_very_long_ascii() {
let text: String = (0..1000)
.map(|i| char::from(b'A' + (i % 26) as u8))
.collect();
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[test]
fn transcode_very_long_nonascii() {
let text: String = (0..500).map(|_| '日').collect();
let input: Vec<u8> = text
.encode_utf16()
.flat_map(|cu| cu.to_le_bytes())
.collect();
let mut out = String::new();
transcode_utf16le(&input, &mut out);
assert_eq!(out, text);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn projection_empty_list_returns_no_columns() {
let reader = YxdbReader::open(test_path("AllTypes.yxdb")).unwrap();
let df = reader.into_dataframe_projected(Some(&[])).unwrap();
assert_eq!(df.width(), 0);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn projection_duplicate_column_names() {
let reader = YxdbReader::open(test_path("AllTypes.yxdb")).unwrap();
let result = reader.into_dataframe_projected(Some(&["Int32Col", "Int32Col"]));
match result {
Ok(df) => {
assert_eq!(df.width(), 1);
assert_eq!(df.height(), 2);
}
Err(e) => {
let msg = format!("{e}");
assert!(msg.contains("duplicate"), "unexpected error: {msg}");
}
}
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn row_reader_read_index_all_fields() {
let mut reader = YxdbRowReader::open(test_path("AllTypes.yxdb")).unwrap();
assert!(reader.next().unwrap());
for i in 0..reader.fields().len() {
let _ = reader.read_index(i).unwrap();
}
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn row_reader_out_of_bounds_index() {
let mut reader = YxdbRowReader::open(test_path("AllTypes.yxdb")).unwrap();
assert!(reader.next().unwrap());
let result = reader.read_index(9999);
assert!(result.is_err());
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn row_reader_single_column_file() {
let mut reader = YxdbRowReader::open(test_path("SingleColumn.yxdb")).unwrap();
assert_eq!(reader.num_records(), 5);
let mut sum = 0i64;
while reader.next().unwrap() {
if let FieldValue::Int32(Some(v)) = reader.read_index(0).unwrap() {
sum += v as i64;
}
}
assert_eq!(sum, 150); }
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn row_reader_large_blob() {
let mut reader = YxdbRowReader::open(test_path("LargeBlob.yxdb")).unwrap();
assert!(reader.next().unwrap());
let val = reader.read_name("Data").unwrap();
match val {
FieldValue::Blob(Some(data)) => assert_eq!(data.len(), 512_000),
other => panic!("expected large blob, got {other:?}"),
}
}
#[test]
fn regression_small_file_error() {
use std::io::Write;
use tempfile::NamedTempFile;
let mut tmp = NamedTempFile::new().unwrap();
tmp.write_all(b"too short").unwrap();
tmp.flush().unwrap();
let err = crate::read_yxdb(tmp.path(), crate::SpatialMode::Raw, false).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("too small") || msg.contains("512"),
"expected descriptive error for tiny file, got: {msg}"
);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn regression_unknown_column_rejected() {
let reader = YxdbReader::open(test_path("AllTypes.yxdb")).unwrap();
let result = reader.into_dataframe_projected(Some(&["Int32Col", "DoesNotExist"]));
assert!(result.is_err(), "expected error for unknown column");
let msg = format!("{}", result.unwrap_err());
assert!(
msg.contains("DoesNotExist"),
"error should mention the missing column name, got: {msg}"
);
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn regression_all_unknown_columns_rejected() {
let reader = YxdbReader::open(test_path("AllTypes.yxdb")).unwrap();
let result = reader.into_dataframe_projected(Some(&["Fake1", "Fake2"]));
assert!(result.is_err());
}
#[test]
fn regression_empty_dataframe_schema_preserved() {
use polars::prelude::*;
use tempfile::NamedTempFile;
let empty_i32: Vec<i32> = vec![];
let empty_str: Vec<&str> = vec![];
let df = df! {
"id" => empty_i32,
"name" => empty_str
}
.unwrap();
let tmp = NamedTempFile::new().unwrap();
crate::write_yxdb(tmp.path(), &df, &[]).unwrap();
let df2 = crate::read_yxdb(tmp.path(), crate::SpatialMode::Raw, false).unwrap();
assert_eq!(df2.height(), 0);
assert_eq!(df2.width(), 2);
let reader = YxdbReader::open(tmp.path()).unwrap();
let df3 = reader.into_dataframe_projected(Some(&["name"])).unwrap();
assert_eq!(df3.height(), 0);
assert_eq!(df3.width(), 1);
assert_eq!(df3.get_column_names()[0].as_str(), "name");
}
#[test]
fn regression_large_blob_decompression() {
use polars::prelude::*;
use tempfile::NamedTempFile;
let big_blob: Vec<u8> = vec![0xAB; 300_000];
let df = test_df(vec![Column::new(
"payload".into(),
vec![big_blob.as_slice()],
)]);
let tmp = NamedTempFile::new().unwrap();
crate::write_yxdb(tmp.path(), &df, &[]).unwrap();
let df2 = crate::read_yxdb(tmp.path(), crate::SpatialMode::Raw, false).unwrap();
let col = df2.column("payload").unwrap().binary().unwrap();
assert_eq!(col.get(0).unwrap().len(), 300_000);
assert!(col.get(0).unwrap().iter().all(|&b| b == 0xAB));
}
#[ignore = "requires test_files/*.yxdb fixtures (never committed) - see TODO"]
#[test]
fn regression_read_yxdb_columns_rejects_unknown() {
let result = crate::read_yxdb_columns(
test_path("AllTypes.yxdb"),
&["Int32Col", "Nonexistent"],
crate::SpatialMode::Raw,
false,
);
assert!(result.is_err());
let msg = format!("{}", result.unwrap_err());
assert!(
msg.contains("Nonexistent"),
"error should mention the unknown column, got: {msg}"
);
}
}