use crate::datatypes::values::{BorrowedValue, Value};
use crate::graph::schema::{InternedKey, StringInterner, TypeSchema};
use crate::graph::storage::column_store::{ColumnStore, TypedColumn};
use crate::graph::storage::mapped::mmap_vec::{MmapBytes, MmapOrVec};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
const BUF_SIZE: usize = 64 * 1024;
pub(super) struct TypeWriter {
schema: Arc<TypeSchema>,
meta: HashMap<String, String>,
out_dir: PathBuf,
columns: Vec<ColumnWriter>,
id_col: ColumnWriter,
title_col: ColumnWriter,
row_count: u32,
overflow_offsets: Vec<u64>,
overflow_data: Vec<u8>,
overflow_used: bool,
}
enum ColumnWriter {
Int64 {
data: BufWriter<File>,
nulls: BufWriter<File>,
len: u32,
data_path: PathBuf,
nulls_path: PathBuf,
},
Float64 {
data: BufWriter<File>,
nulls: BufWriter<File>,
len: u32,
data_path: PathBuf,
nulls_path: PathBuf,
},
UniqueId {
data: BufWriter<File>,
nulls: BufWriter<File>,
len: u32,
data_path: PathBuf,
nulls_path: PathBuf,
},
Bool {
data: BufWriter<File>,
nulls: BufWriter<File>,
len: u32,
data_path: PathBuf,
nulls_path: PathBuf,
},
Date {
data: BufWriter<File>,
nulls: BufWriter<File>,
len: u32,
data_path: PathBuf,
nulls_path: PathBuf,
},
Str {
offsets: BufWriter<File>,
data: BufWriter<File>,
nulls: BufWriter<File>,
cursor: u64,
len: u32,
offsets_path: PathBuf,
data_path: PathBuf,
nulls_path: PathBuf,
},
Mixed { values: Vec<Value> },
}
impl TypeWriter {
pub(super) fn new(
schema: Arc<TypeSchema>,
meta: HashMap<String, String>,
out_dir: PathBuf,
interner: &StringInterner,
id_type: &str,
title_type: &str,
) -> io::Result<Self> {
std::fs::create_dir_all(&out_dir)?;
let mut columns = Vec::with_capacity(schema.len());
for (_slot, ik) in schema.iter() {
let col_name = interner.resolve(ik);
let type_str = meta.get(col_name).map(|s| s.as_str()).unwrap_or("mixed");
columns.push(ColumnWriter::open(type_str, col_name, &out_dir)?);
}
let id_col = ColumnWriter::open(id_type, "__id__", &out_dir)?;
let title_col = ColumnWriter::open(title_type, "__title__", &out_dir)?;
let overflow_offsets = vec![0u64];
Ok(Self {
schema,
meta,
out_dir,
columns,
id_col,
title_col,
row_count: 0,
overflow_offsets,
overflow_data: Vec::new(),
overflow_used: false,
})
}
#[cfg(test)]
pub(super) fn push_row(
&mut self,
id: &Value,
title: &Value,
props: &[(InternedKey, Value)],
) -> io::Result<u32> {
self.id_col.push(id)?;
self.title_col.push(title)?;
let mut slot_values: Vec<Option<&Value>> = vec![None; self.columns.len()];
for (key, value) in props {
if let Some(slot) = self.schema.slot(*key) {
slot_values[slot as usize] = Some(value);
}
}
for (slot, slot_val) in slot_values.iter().enumerate() {
let v: &Value = slot_val.unwrap_or(&Value::Null);
self.columns[slot].push(v)?;
}
let row_id = self.row_count;
self.row_count = self
.row_count
.checked_add(1)
.ok_or_else(|| io::Error::other("TypeWriter: row count overflow > u32::MAX"))?;
Ok(row_id)
}
pub(super) fn push_row_borrowed<F>(
&mut self,
id: BorrowedValue<'_>,
title: BorrowedValue<'_>,
f: F,
) -> io::Result<u32>
where
F: FnOnce(&mut RowVisitor<'_>) -> io::Result<()>,
{
self.id_col.push_borrowed(id)?;
self.title_col.push_borrowed(title)?;
let mut row_overflow: Vec<u8> = Vec::new();
row_overflow.extend_from_slice(&0u16.to_le_bytes());
let mut row_overflow_count: u16 = 0;
let filled = {
let mut visitor = RowVisitor {
schema: &self.schema,
columns: &mut self.columns,
filled: SmallBitmap::new(self.schema.len()),
overflow_buf: &mut row_overflow,
overflow_count: &mut row_overflow_count,
};
f(&mut visitor)?;
visitor.filled
};
for slot in 0..self.columns.len() {
if !filled.get(slot) {
self.columns[slot].push_borrowed(BorrowedValue::Null)?;
}
}
if row_overflow_count > 0 {
self.overflow_used = true;
}
row_overflow[0..2].copy_from_slice(&row_overflow_count.to_le_bytes());
self.overflow_data.extend_from_slice(&row_overflow);
self.overflow_offsets.push(self.overflow_data.len() as u64);
let row_id = self.row_count;
self.row_count = self
.row_count
.checked_add(1)
.ok_or_else(|| io::Error::other("TypeWriter: row count overflow > u32::MAX"))?;
Ok(row_id)
}
pub(super) fn finalize(self, interner: &StringInterner) -> io::Result<Arc<ColumnStore>> {
let row_count = self.row_count;
let mut typed_columns: Vec<TypedColumn> = Vec::with_capacity(self.columns.len());
for col in self.columns {
typed_columns.push(col.finalize(row_count)?);
}
let id_typed = self.id_col.finalize(row_count)?;
let title_typed = self.title_col.finalize(row_count)?;
let mut store = ColumnStore::new(Arc::clone(&self.schema), &self.meta, interner);
store.replace_columns(typed_columns);
store.replace_id_column(id_typed);
store.replace_title_column(title_typed);
store.set_row_count(row_count);
if self.overflow_used {
use crate::graph::storage::mapped::mmap_vec::{MmapBytes, MmapOrVec};
let offsets_vec = MmapOrVec::from_vec(self.overflow_offsets);
let mut data_bytes = MmapBytes::new();
data_bytes.extend(&self.overflow_data);
store.replace_overflow_bag(offsets_vec, data_bytes);
}
let _ = self.out_dir;
Ok(Arc::new(store))
}
}
pub(super) struct RowVisitor<'a> {
schema: &'a Arc<TypeSchema>,
columns: &'a mut Vec<ColumnWriter>,
filled: SmallBitmap,
overflow_buf: &'a mut Vec<u8>,
overflow_count: &'a mut u16,
}
impl<'a> RowVisitor<'a> {
pub(super) fn push_property(
&mut self,
key: InternedKey,
value: BorrowedValue<'_>,
) -> io::Result<()> {
if let Some(slot) = self.schema.slot(key) {
let s = slot as usize;
if self.filled.get(s) {
return Err(io::Error::other(format!(
"TypeWriter::push_row_borrowed: slot {} filled twice in one row",
s
)));
}
self.columns[s].push_borrowed(value)?;
self.filled.set(s);
} else {
if !matches!(value, BorrowedValue::Null) {
ColumnStore::serialize_overflow_value_borrowed(self.overflow_buf, key, &value);
*self.overflow_count = self.overflow_count.saturating_add(1);
}
}
Ok(())
}
}
struct SmallBitmap {
inline: u64,
overflow: Vec<u64>,
len: usize,
}
impl SmallBitmap {
fn new(len: usize) -> Self {
let overflow = if len > 64 {
vec![0u64; (len - 64).div_ceil(64)]
} else {
Vec::new()
};
Self {
inline: 0,
overflow,
len,
}
}
fn get(&self, idx: usize) -> bool {
debug_assert!(idx < self.len);
if idx < 64 {
(self.inline >> idx) & 1 == 1
} else {
let i = idx - 64;
(self.overflow[i / 64] >> (i % 64)) & 1 == 1
}
}
fn set(&mut self, idx: usize) {
debug_assert!(idx < self.len);
if idx < 64 {
self.inline |= 1u64 << idx;
} else {
let i = idx - 64;
self.overflow[i / 64] |= 1u64 << (i % 64);
}
}
}
impl ColumnWriter {
fn open(type_str: &str, col_name: &str, out_dir: &Path) -> io::Result<Self> {
let lower = type_str.to_ascii_lowercase();
match lower.as_str() {
"int64" => Self::open_fixed(
col_name,
out_dir,
"i64",
|data, nulls, len, data_path, nulls_path| ColumnWriter::Int64 {
data,
nulls,
len,
data_path,
nulls_path,
},
),
"float64" => Self::open_fixed(
col_name,
out_dir,
"f64",
|data, nulls, len, data_path, nulls_path| ColumnWriter::Float64 {
data,
nulls,
len,
data_path,
nulls_path,
},
),
"uniqueid" => Self::open_fixed(
col_name,
out_dir,
"u32",
|data, nulls, len, data_path, nulls_path| ColumnWriter::UniqueId {
data,
nulls,
len,
data_path,
nulls_path,
},
),
"bool" | "boolean" => Self::open_fixed(
col_name,
out_dir,
"bool",
|data, nulls, len, data_path, nulls_path| ColumnWriter::Bool {
data,
nulls,
len,
data_path,
nulls_path,
},
),
"date" | "datetime" => Self::open_fixed(
col_name,
out_dir,
"i32",
|data, nulls, len, data_path, nulls_path| ColumnWriter::Date {
data,
nulls,
len,
data_path,
nulls_path,
},
),
"string" => {
let offsets_path = out_dir.join(format!("{col_name}.off"));
let data_path = out_dir.join(format!("{col_name}.str"));
let nulls_path = out_dir.join(format!("{col_name}.null"));
let mut offsets = open_buf_writer(&offsets_path)?;
let data = open_buf_writer(&data_path)?;
let nulls = open_buf_writer(&nulls_path)?;
offsets.write_all(&0u64.to_le_bytes())?;
Ok(ColumnWriter::Str {
offsets,
data,
nulls,
cursor: 0,
len: 0,
offsets_path,
data_path,
nulls_path,
})
}
_ => Ok(ColumnWriter::Mixed { values: Vec::new() }),
}
}
fn open_fixed<F>(col_name: &str, out_dir: &Path, data_ext: &str, make: F) -> io::Result<Self>
where
F: FnOnce(BufWriter<File>, BufWriter<File>, u32, PathBuf, PathBuf) -> Self,
{
let data_path = out_dir.join(format!("{col_name}.{data_ext}"));
let nulls_path = out_dir.join(format!("{col_name}.null"));
let data = open_buf_writer(&data_path)?;
let nulls = open_buf_writer(&nulls_path)?;
Ok(make(data, nulls, 0, data_path, nulls_path))
}
fn push_borrowed(&mut self, value: BorrowedValue<'_>) -> io::Result<()> {
match self {
ColumnWriter::Int64 {
data, nulls, len, ..
} => {
let (v, is_null): (i64, u8) = match value {
BorrowedValue::Int64(x) => (x, 0),
BorrowedValue::Null => (0, 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push_borrowed: expected Int64/Null, got {:?}",
borrowed_kind(&other)
)))
}
};
data.write_all(&v.to_le_bytes())?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::Float64 {
data, nulls, len, ..
} => {
let (v, is_null): (f64, u8) = match value {
BorrowedValue::Float64(x) => (x, 0),
BorrowedValue::Int64(x) => (x as f64, 0),
BorrowedValue::Null => (0.0, 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push_borrowed: expected Float64/Int64/Null, got {:?}",
borrowed_kind(&other)
)))
}
};
data.write_all(&v.to_le_bytes())?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::UniqueId {
data, nulls, len, ..
} => {
let (v, is_null): (u32, u8) = match value {
BorrowedValue::UniqueId(x) => (x, 0),
BorrowedValue::Null => (0, 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push_borrowed: expected UniqueId/Null, got {:?}",
borrowed_kind(&other)
)))
}
};
data.write_all(&v.to_le_bytes())?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::Bool {
data, nulls, len, ..
} => {
let (v, is_null): (u8, u8) = match value {
BorrowedValue::Boolean(b) => (b as u8, 0),
BorrowedValue::Null => (0, 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push_borrowed: expected Boolean/Null, got {:?}",
borrowed_kind(&other)
)))
}
};
data.write_all(&[v])?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::Date {
data, nulls, len, ..
} => {
let (days, is_null): (i32, u8) = match value {
BorrowedValue::DateTime(d) => {
let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1)
.expect("1970-01-01 is a valid date");
((d - epoch).num_days() as i32, 0)
}
BorrowedValue::Null => (0, 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push_borrowed: expected DateTime/Null, got {:?}",
borrowed_kind(&other)
)))
}
};
data.write_all(&days.to_le_bytes())?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::Str {
offsets,
data,
nulls,
cursor,
len,
..
} => {
let (bytes, is_null): (&[u8], u8) = match value {
BorrowedValue::String(s) => (s.as_bytes(), 0),
BorrowedValue::Null => (&[], 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push_borrowed: expected String/Null, got {:?}",
borrowed_kind(&other)
)))
}
};
if !bytes.is_empty() {
data.write_all(bytes)?;
*cursor = cursor
.checked_add(bytes.len() as u64)
.ok_or_else(|| io::Error::other("Str data cursor overflow"))?;
}
offsets.write_all(&cursor.to_le_bytes())?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::Mixed { values } => {
values.push(value.to_value());
}
}
Ok(())
}
#[cfg(test)]
fn push(&mut self, value: &Value) -> io::Result<()> {
match self {
ColumnWriter::Int64 {
data, nulls, len, ..
} => {
let (v, is_null): (i64, u8) = match value {
Value::Int64(x) => (*x, 0),
Value::Null => (0, 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push: expected Int64/Null, got {:?}",
value_kind(other)
)))
}
};
data.write_all(&v.to_le_bytes())?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::Float64 {
data, nulls, len, ..
} => {
let (v, is_null): (f64, u8) = match value {
Value::Float64(x) => (*x, 0),
Value::Int64(x) => (*x as f64, 0),
Value::Null => (0.0, 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push: expected Float64/Int64/Null, got {:?}",
value_kind(other)
)))
}
};
data.write_all(&v.to_le_bytes())?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::UniqueId {
data, nulls, len, ..
} => {
let (v, is_null): (u32, u8) = match value {
Value::UniqueId(x) => (*x, 0),
Value::Null => (0, 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push: expected UniqueId/Null, got {:?}",
value_kind(other)
)))
}
};
data.write_all(&v.to_le_bytes())?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::Bool {
data, nulls, len, ..
} => {
let (v, is_null): (u8, u8) = match value {
Value::Boolean(b) => (*b as u8, 0),
Value::Null => (0, 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push: expected Boolean/Null, got {:?}",
value_kind(other)
)))
}
};
data.write_all(&[v])?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::Date {
data, nulls, len, ..
} => {
let (days, is_null): (i32, u8) = match value {
Value::DateTime(d) => {
let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1)
.expect("1970-01-01 is a valid date");
((*d - epoch).num_days() as i32, 0)
}
Value::Null => (0, 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push: expected DateTime/Null, got {:?}",
value_kind(other)
)))
}
};
data.write_all(&days.to_le_bytes())?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::Str {
offsets,
data,
nulls,
cursor,
len,
..
} => {
let (bytes, is_null): (&[u8], u8) = match value {
Value::String(s) => (s.as_bytes(), 0),
Value::Null => (&[], 1),
other => {
return Err(io::Error::other(format!(
"TypeWriter::push: expected String/Null, got {:?}",
value_kind(other)
)))
}
};
if !bytes.is_empty() {
data.write_all(bytes)?;
*cursor = cursor
.checked_add(bytes.len() as u64)
.ok_or_else(|| io::Error::other("Str data cursor overflow"))?;
}
offsets.write_all(&cursor.to_le_bytes())?;
nulls.write_all(&[is_null])?;
*len = len.checked_add(1).ok_or_else(row_overflow)?;
}
ColumnWriter::Mixed { values } => {
values.push(value.clone());
}
}
Ok(())
}
fn finalize(self, row_count: u32) -> io::Result<TypedColumn> {
match self {
ColumnWriter::Int64 {
data,
nulls,
len,
data_path,
nulls_path,
} => {
close_buf_writers([data, nulls])?;
expect_len(len, row_count, &data_path)?;
let data = MmapOrVec::<i64>::load_mapped(&data_path, row_count as usize)?;
let nulls = MmapOrVec::<u8>::load_mapped(&nulls_path, row_count as usize)?;
Ok(TypedColumn::Int64 { data, nulls })
}
ColumnWriter::Float64 {
data,
nulls,
len,
data_path,
nulls_path,
} => {
close_buf_writers([data, nulls])?;
expect_len(len, row_count, &data_path)?;
let data = MmapOrVec::<f64>::load_mapped(&data_path, row_count as usize)?;
let nulls = MmapOrVec::<u8>::load_mapped(&nulls_path, row_count as usize)?;
Ok(TypedColumn::Float64 { data, nulls })
}
ColumnWriter::UniqueId {
data,
nulls,
len,
data_path,
nulls_path,
} => {
close_buf_writers([data, nulls])?;
expect_len(len, row_count, &data_path)?;
let data = MmapOrVec::<u32>::load_mapped(&data_path, row_count as usize)?;
let nulls = MmapOrVec::<u8>::load_mapped(&nulls_path, row_count as usize)?;
Ok(TypedColumn::UniqueId { data, nulls })
}
ColumnWriter::Bool {
data,
nulls,
len,
data_path,
nulls_path,
} => {
close_buf_writers([data, nulls])?;
expect_len(len, row_count, &data_path)?;
let data = MmapOrVec::<u8>::load_mapped(&data_path, row_count as usize)?;
let nulls = MmapOrVec::<u8>::load_mapped(&nulls_path, row_count as usize)?;
Ok(TypedColumn::Bool { data, nulls })
}
ColumnWriter::Date {
data,
nulls,
len,
data_path,
nulls_path,
} => {
close_buf_writers([data, nulls])?;
expect_len(len, row_count, &data_path)?;
let data = MmapOrVec::<i32>::load_mapped(&data_path, row_count as usize)?;
let nulls = MmapOrVec::<u8>::load_mapped(&nulls_path, row_count as usize)?;
Ok(TypedColumn::Date { data, nulls })
}
ColumnWriter::Str {
offsets,
data,
nulls,
cursor,
len,
offsets_path,
data_path,
nulls_path,
} => {
close_buf_writers([offsets, data, nulls])?;
expect_len(len, row_count, &offsets_path)?;
let offsets =
MmapOrVec::<u64>::load_mapped(&offsets_path, (row_count as usize) + 1)?;
let data = MmapBytes::load_mapped(&data_path, cursor as usize)?;
let nulls = MmapOrVec::<u8>::load_mapped(&nulls_path, row_count as usize)?;
Ok(TypedColumn::Str {
offsets,
data,
nulls,
relocated: std::collections::HashMap::new(),
})
}
ColumnWriter::Mixed { values } => {
if values.len() as u32 != row_count {
return Err(io::Error::other(format!(
"TypeWriter Mixed column finalize: expected {} values, got {}",
row_count,
values.len()
)));
}
Ok(TypedColumn::Mixed { data: values })
}
}
}
}
fn open_buf_writer(path: &Path) -> io::Result<BufWriter<File>> {
let f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)?;
Ok(BufWriter::with_capacity(BUF_SIZE, f))
}
fn close_buf_writers<const N: usize>(writers: [BufWriter<File>; N]) -> io::Result<()> {
for mut w in writers {
w.flush()?;
}
Ok(())
}
fn expect_len(actual: u32, expected: u32, path: &Path) -> io::Result<()> {
if actual != expected {
return Err(io::Error::other(format!(
"TypeWriter::finalize: column {} pushed {} rows, expected {}",
path.display(),
actual,
expected
)));
}
Ok(())
}
fn row_overflow() -> io::Error {
io::Error::other("TypeWriter column row count overflow > u32::MAX")
}
fn borrowed_kind(v: &BorrowedValue<'_>) -> &'static str {
match v {
BorrowedValue::Null => "Null",
BorrowedValue::Boolean(_) => "Boolean",
BorrowedValue::Int64(_) => "Int64",
BorrowedValue::Float64(_) => "Float64",
BorrowedValue::UniqueId(_) => "UniqueId",
BorrowedValue::String(_) => "String",
BorrowedValue::DateTime(_) => "DateTime",
}
}
#[cfg(test)]
fn value_kind(v: &Value) -> &'static str {
v.type_name()
}
#[cfg(test)]
#[allow(clippy::approx_constant)]
mod tests {
use super::*;
use crate::graph::schema::TypeSchema;
use tempfile::TempDir;
fn make_schema(interner: &mut StringInterner, props: &[&str]) -> Arc<TypeSchema> {
let keys: Vec<InternedKey> = props.iter().map(|p| interner.get_or_intern(p)).collect();
Arc::new(TypeSchema::from_keys(keys))
}
#[test]
fn round_trip_mixed_types() {
let mut interner = StringInterner::new();
let schema = make_schema(&mut interner, &["age", "name", "active", "score"]);
let mut meta: HashMap<String, String> = HashMap::new();
meta.insert("age".to_string(), "int64".to_string());
meta.insert("name".to_string(), "string".to_string());
meta.insert("active".to_string(), "bool".to_string());
meta.insert("score".to_string(), "float64".to_string());
let dir = TempDir::new().unwrap();
let mut writer = TypeWriter::new(
Arc::clone(&schema),
meta.clone(),
dir.path().to_path_buf(),
&interner,
"string",
"string",
)
.unwrap();
let key_age = interner.get_or_intern("age");
let key_name = interner.get_or_intern("name");
let key_active = interner.get_or_intern("active");
let key_score = interner.get_or_intern("score");
let names = ["Alice", "Bob", "Carla-with-a-longer-name", "", "Dee"];
for i in 0..5u32 {
let id = Value::String(format!("id-{i}"));
let title = Value::String(format!("Title {i}"));
let props = vec![
(key_age, Value::Int64(20 + i as i64)),
(key_name, Value::String(names[i as usize].to_string())),
(key_active, Value::Boolean(i % 2 == 0)),
(key_score, Value::Float64(i as f64 * 1.5)),
];
let row = writer.push_row(&id, &title, &props).unwrap();
assert_eq!(row, i);
}
let store = writer.finalize(&interner).unwrap();
assert_eq!(store.row_count(), 5);
for i in 0..5u32 {
assert_eq!(store.get_id(i).unwrap(), Value::String(format!("id-{i}")));
assert_eq!(
store.get_title(i).unwrap(),
Value::String(format!("Title {i}"))
);
assert_eq!(store.get(i, key_age).unwrap(), Value::Int64(20 + i as i64));
let expected_name = names[i as usize].to_string();
if expected_name.is_empty() {
assert_eq!(
store.get(i, key_name).unwrap(),
Value::String(String::new())
);
} else {
assert_eq!(
store.get(i, key_name).unwrap(),
Value::String(expected_name)
);
}
assert_eq!(
store.get(i, key_active).unwrap(),
Value::Boolean(i % 2 == 0)
);
assert_eq!(
store.get(i, key_score).unwrap(),
Value::Float64(i as f64 * 1.5)
);
}
}
#[test]
fn null_values_round_trip() {
let mut interner = StringInterner::new();
let schema = make_schema(&mut interner, &["x", "s"]);
let mut meta: HashMap<String, String> = HashMap::new();
meta.insert("x".to_string(), "int64".to_string());
meta.insert("s".to_string(), "string".to_string());
let key_x = interner.get_or_intern("x");
let key_s = interner.get_or_intern("s");
let dir = TempDir::new().unwrap();
let mut writer = TypeWriter::new(
Arc::clone(&schema),
meta,
dir.path().to_path_buf(),
&interner,
"string",
"string",
)
.unwrap();
writer
.push_row(
&Value::Null,
&Value::Null,
&[(key_x, Value::Null), (key_s, Value::Null)],
)
.unwrap();
writer
.push_row(
&Value::String("a".into()),
&Value::String("T".into()),
&[
(key_x, Value::Int64(7)),
(key_s, Value::String("hello".into())),
],
)
.unwrap();
let store = writer.finalize(&interner).unwrap();
assert_eq!(store.row_count(), 2);
assert!(store.get_id(0).is_none());
assert!(store.get_title(0).is_none());
assert!(store.get(0, key_x).is_none());
assert!(store.get(0, key_s).is_none());
assert_eq!(store.get_id(1).unwrap(), Value::String("a".into()));
assert_eq!(store.get(1, key_x).unwrap(), Value::Int64(7));
assert_eq!(store.get(1, key_s).unwrap(), Value::String("hello".into()));
}
#[test]
fn mixed_column_heterogeneous() {
let mut interner = StringInterner::new();
let schema = make_schema(&mut interner, &["payload"]);
let mut meta: HashMap<String, String> = HashMap::new();
meta.insert("payload".to_string(), "mixed".to_string());
let key_p = interner.get_or_intern("payload");
let dir = TempDir::new().unwrap();
let mut writer = TypeWriter::new(
Arc::clone(&schema),
meta,
dir.path().to_path_buf(),
&interner,
"string",
"string",
)
.unwrap();
let values = [
Value::Int64(42),
Value::String("hello".into()),
Value::Float64(3.14),
Value::Null,
Value::Boolean(true),
];
for (i, v) in values.iter().enumerate() {
writer
.push_row(
&Value::String(format!("id{i}")),
&Value::Null,
&[(key_p, v.clone())],
)
.unwrap();
}
let store = writer.finalize(&interner).unwrap();
for (i, v) in values.iter().enumerate() {
match v {
Value::Null => assert!(store.get(i as u32, key_p).is_none()),
_ => assert_eq!(store.get(i as u32, key_p).unwrap(), v.clone()),
}
}
}
}