use super::*;
pub(crate) fn deserialize_table(
cur: &mut Cursor<'_>,
cat: &mut Catalog,
version: u8,
) -> Result<(), StorageError> {
let table_name = cur.read_str()?;
let name = table_name.clone();
let col_count = cur.read_u16()? as usize;
let mut cols = Vec::with_capacity(col_count);
for _ in 0..col_count {
let c_name = cur.read_str()?;
let ty = cur.read_data_type()?;
let nullable = cur.read_u8()? != 0;
let default = match cur.read_u8()? {
0 => None,
1 => Some(cur.read_value()?),
other => {
return Err(StorageError::Corrupt(format!(
"unknown default tag: {other}"
)));
}
};
let auto_increment = cur.read_u8()? != 0;
cols.push(ColumnSchema {
name: c_name,
ty,
nullable,
default,
runtime_default: None,
auto_increment,
user_enum_type: None,
user_domain_type: None,
on_update_runtime: None,
collation: Collation::Binary,
is_unsigned: false,
inline_enum_variants: None,
inline_set_variants: None,
});
}
let n_cols = cols.len();
cat.create_table(TableSchema::new(name, cols))?;
let t = cat.tables.last_mut().expect("create_table just pushed");
deserialize_rows(cur, t, n_cols)?;
deserialize_indices(cur, t, version)?;
if version >= 11 {
let has = cur.read_u8()?;
let hot_tier_bytes = match has {
0 => None,
1 => Some(cur.read_u64()?),
other => {
return Err(StorageError::Corrupt(format!(
"hot_tier_bytes appendix: unknown has-value byte {other}"
)));
}
};
t.schema_mut().hot_tier_bytes = hot_tier_bytes;
}
if version >= 13 {
let fk_count = cur.read_u16()? as usize;
let mut fks = Vec::with_capacity(fk_count);
for _ in 0..fk_count {
let name = match cur.read_u8()? {
0 => None,
1 => Some(cur.read_str()?),
other => {
return Err(StorageError::Corrupt(format!(
"FK appendix: unknown has-name byte {other}"
)));
}
};
let local_arity = cur.read_u16()? as usize;
let mut local_columns = Vec::with_capacity(local_arity);
for _ in 0..local_arity {
local_columns.push(cur.read_u16()? as usize);
}
let parent_table = cur.read_str()?;
let parent_arity = cur.read_u16()? as usize;
if parent_arity != local_arity {
return Err(StorageError::Corrupt(format!(
"FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
)));
}
let mut parent_columns = Vec::with_capacity(parent_arity);
for _ in 0..parent_arity {
parent_columns.push(cur.read_u16()? as usize);
}
let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
})?;
let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
StorageError::Corrupt("FK appendix: unknown on_update tag".into())
})?;
fks.push(ForeignKeyConstraint {
name,
local_columns,
parent_table,
parent_columns,
on_delete,
on_update,
});
}
t.schema_mut().foreign_keys = fks;
}
if version >= 15 {
let uc_count = cur.read_u16()? as usize;
let mut ucs = Vec::with_capacity(uc_count);
for _ in 0..uc_count {
let is_pk = cur.read_u8()? != 0;
let arity = cur.read_u16()? as usize;
let mut cols = Vec::with_capacity(arity);
for _ in 0..arity {
cols.push(cur.read_u16()? as usize);
}
let nulls_not_distinct = if version >= 23 {
cur.read_u8()? != 0
} else {
false
};
ucs.push(UniquenessConstraint {
is_primary_key: is_pk,
columns: cols,
nulls_not_distinct,
});
}
t.schema_mut().uniqueness_constraints = ucs;
let rt_count = cur.read_u16()? as usize;
for _ in 0..rt_count {
let pos = cur.read_u16()? as usize;
let expr = cur.read_str()?;
if let Some(col) = t.schema_mut().columns.get_mut(pos) {
col.runtime_default = Some(expr);
}
}
}
if version >= 23 {
let check_count = cur.read_u16()? as usize;
let mut checks = Vec::with_capacity(check_count);
for _ in 0..check_count {
checks.push(cur.read_str()?);
}
t.schema_mut().checks = checks;
}
if version >= 29 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let ename = cur.read_str()?;
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.user_enum_type = Some(ename);
}
}
}
if version >= 30 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let dname = cur.read_str()?;
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.user_domain_type = Some(dname);
}
}
}
if version >= 32 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let expr_src = cur.read_str()?;
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.on_update_runtime = Some(expr_src);
}
}
}
if version >= 34 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let tag = cur.read_u8()?;
let collation = match tag {
Collation::TAG_CASE_INSENSITIVE => Collation::CaseInsensitive,
_ => Collation::Binary,
};
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.collation = collation;
}
}
}
if version >= 35 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.is_unsigned = true;
}
}
}
if version >= 41 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let variant_count = cur.read_u16()? as usize;
let mut variants = Vec::with_capacity(variant_count);
for _ in 0..variant_count {
variants.push(cur.read_str()?);
}
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.inline_enum_variants = Some(variants);
}
}
}
if version >= 42 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let variant_count = cur.read_u16()? as usize;
let mut variants = Vec::with_capacity(variant_count);
for _ in 0..variant_count {
variants.push(cur.read_str()?);
}
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.inline_set_variants = Some(variants);
}
}
}
let _ = table_name;
Ok(())
}
fn deserialize_rows(
cur: &mut Cursor<'_>,
t: &mut Table,
_n_cols: usize,
) -> Result<(), StorageError> {
let row_count = cur.read_u32()? as usize;
let mut hot_bytes: u64 = 0;
for _ in 0..row_count {
let tail = &cur.buf[cur.pos..];
let (row, consumed) = decode_row_body_dense(tail, &t.schema, cur.codec_version)?;
cur.pos += consumed;
hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
t.rows.push_mut(row);
}
t.hot_bytes = hot_bytes;
Ok(())
}
fn deserialize_indices(
cur: &mut Cursor<'_>,
t: &mut Table,
version: u8,
) -> Result<(), StorageError> {
let index_count = cur.read_u16()? as usize;
for _ in 0..index_count {
let idx_name = cur.read_str()?;
let col_pos = cur.read_u16()? as usize;
let column_name = t
.schema
.columns
.get(col_pos)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"index {idx_name:?} points at non-existent column position {col_pos}"
))
})?
.name
.clone();
let kind_tag = cur.read_u8()?;
match kind_tag {
0 => {
if version >= 9 {
let map = read_btree_map(cur)?;
t.restore_btree_index(idx_name, &column_name, map)?;
} else {
t.add_index(idx_name, &column_name)?;
}
}
1 => {
let m = cur.read_u16()? as usize;
let graph = cur.read_nsw_graph(m)?;
t.restore_nsw_index(idx_name, &column_name, graph)?;
}
2 => {
let column_type = cur.read_data_type()?;
t.restore_brin_index(idx_name, &column_name, column_type)?;
}
3 => {
let map = read_gin_map(cur)?;
t.restore_gin_index(idx_name, &column_name, map)?;
}
4 => {
if version < 24 {
return Err(StorageError::Corrupt(format!(
"trigram-GIN index tag 4 found in catalog FILE_VERSION {version}; \
FILE_VERSION 24+ required (v7.15.0 introduced this tag)"
)));
}
let map = read_gin_map(cur)?;
t.restore_gin_trgm_index(idx_name, &column_name, map)?;
}
5 => {
if version < 33 {
return Err(StorageError::Corrupt(format!(
"fulltext-GIN index tag 5 found in catalog FILE_VERSION {version}; \
FILE_VERSION 33+ required (v7.17.0 Phase 2.2 introduced this tag)"
)));
}
let map = read_gin_map(cur)?;
t.restore_gin_fulltext_index(idx_name, &column_name, map)?;
}
other => {
return Err(StorageError::Corrupt(format!(
"unknown index kind tag: {other}"
)));
}
}
if version >= 12 {
let num_included = cur.read_u16()? as usize;
if num_included > 0 {
let mut included: Vec<usize> = Vec::with_capacity(num_included);
for _ in 0..num_included {
let cp = cur.read_u16()? as usize;
if cp >= t.schema.columns.len() {
return Err(StorageError::Corrupt(format!(
"INCLUDE column position {cp} out of range \
({} schema columns)",
t.schema.columns.len()
)));
}
included.push(cp);
}
if let Some(last) = t.indices.last_mut() {
last.included_columns = included;
}
}
match cur.read_u8()? {
0 => {}
1 => {
let pred = cur.read_str()?;
if let Some(last) = t.indices.last_mut() {
last.partial_predicate = Some(pred);
}
}
other => {
return Err(StorageError::Corrupt(format!(
"partial_predicate tag: unknown byte {other}"
)));
}
}
match cur.read_u8()? {
0 => {}
1 => {
let expr = cur.read_str()?;
if let Some(last) = t.indices.last_mut() {
last.expression = Some(expr);
}
}
other => {
return Err(StorageError::Corrupt(format!(
"expression tag: unknown byte {other}"
)));
}
}
if version >= 16 {
match cur.read_u8()? {
0 => {}
1 => {
if let Some(last) = t.indices.last_mut() {
last.is_unique = true;
}
}
other => {
return Err(StorageError::Corrupt(format!(
"is_unique tag: unknown byte {other}"
)));
}
}
let n = cur.read_u16()? as usize;
if n > 0 {
let mut extras: Vec<usize> = Vec::with_capacity(n);
for _ in 0..n {
let cp = cur.read_u16()? as usize;
if cp >= t.schema.columns.len() {
return Err(StorageError::Corrupt(format!(
"extra column position {cp} out of range \
({} schema columns)",
t.schema.columns.len()
)));
}
extras.push(cp);
}
if let Some(last) = t.indices.last_mut() {
last.extra_column_positions = extras;
}
}
}
}
}
Ok(())
}
fn read_btree_map(
cur: &mut Cursor<'_>,
) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
let entry_count = cur.read_u32()? as usize;
let mut map = PersistentBTreeMap::new();
for _ in 0..entry_count {
let key = cur.read_index_key()?;
let locator_count = cur.read_u32()? as usize;
let mut locators = Vec::with_capacity(locator_count);
for _ in 0..locator_count {
let tail = &cur.buf[cur.pos..];
let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
})?;
cur.pos += consumed;
locators.push(loc);
}
map.insert_mut(key, locators);
}
Ok(map)
}
fn read_gin_map(
cur: &mut Cursor<'_>,
) -> Result<PersistentBTreeMap<String, Vec<RowLocator>>, StorageError> {
let entry_count = cur.read_u32()? as usize;
let mut map = PersistentBTreeMap::new();
for _ in 0..entry_count {
let word = cur.read_str()?;
let locator_count = cur.read_u32()? as usize;
let mut locators = Vec::with_capacity(locator_count);
for _ in 0..locator_count {
let tail = &cur.buf[cur.pos..];
let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
})?;
cur.pos += consumed;
locators.push(loc);
}
map.insert_mut(word, locators);
}
Ok(map)
}
pub(crate) fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
let entry = g.entry.map_or(u32::MAX, |e| {
u32::try_from(e).expect("NSW entry fits in u32")
});
write_u16(
out,
u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
);
out.extend_from_slice(&entry.to_le_bytes());
out.push(g.entry_level);
let node_count = g.levels.len();
write_u32(
out,
u32::try_from(node_count).expect("HNSW node count fits in u32"),
);
for &lvl in &g.levels {
out.push(lvl);
}
let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
out.push(layer_count);
for layer in &g.layers {
write_u32(
out,
u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
);
for neighbors in layer {
write_u16(
out,
u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
);
for &peer in neighbors {
write_u32(out, peer);
}
}
}
}
pub(crate) fn write_data_type(out: &mut Vec<u8>, t: DataType) {
match t {
DataType::Int => out.push(1),
DataType::BigInt => out.push(2),
DataType::Float => out.push(3),
DataType::Text => out.push(4),
DataType::Bool => out.push(5),
DataType::Vector { dim, encoding } => match encoding {
VecEncoding::F32 => {
out.push(6);
out.extend_from_slice(&dim.to_le_bytes());
}
VecEncoding::F16 => {
out.push(15);
out.extend_from_slice(&dim.to_le_bytes());
}
VecEncoding::Sq8 => {
out.push(14);
out.extend_from_slice(&dim.to_le_bytes());
}
},
DataType::SmallInt => out.push(7),
DataType::Varchar(max) => {
out.push(8);
out.extend_from_slice(&max.to_le_bytes());
}
DataType::Char(size) => {
out.push(9);
out.extend_from_slice(&size.to_le_bytes());
}
DataType::Numeric { precision, scale } => {
out.push(10);
out.push(precision);
out.push(scale);
}
DataType::Date => out.push(11),
DataType::Timestamp => out.push(12),
DataType::Timestamptz => out.push(17),
DataType::Interval => {
unreachable!("DataType::Interval has no on-disk encoding in v2.11")
}
DataType::Json => out.push(13),
DataType::Jsonb => out.push(16),
DataType::Bytes => out.push(18),
DataType::TextArray => out.push(19),
DataType::IntArray => out.push(20),
DataType::BigIntArray => out.push(21),
DataType::TsVector => out.push(22),
DataType::TsQuery => out.push(23),
DataType::Uuid => out.push(24),
DataType::Time => out.push(25),
DataType::Year => out.push(26),
DataType::TimeTz => out.push(27),
DataType::Money => out.push(28),
DataType::Range(k) => {
out.push(29);
out.push(k.tag());
}
DataType::Hstore => out.push(30),
DataType::IntArray2D => out.push(31),
DataType::BigIntArray2D => out.push(32),
DataType::TextArray2D => out.push(33),
}
}
impl Cursor<'_> {
pub(crate) fn read_data_type(&mut self) -> Result<DataType, StorageError> {
let tag = self.read_u8()?;
match tag {
1 => Ok(DataType::Int),
2 => Ok(DataType::BigInt),
3 => Ok(DataType::Float),
4 => Ok(DataType::Text),
5 => Ok(DataType::Bool),
6 => Ok(DataType::Vector {
dim: self.read_u32()?,
encoding: VecEncoding::F32,
}),
7 => Ok(DataType::SmallInt),
8 => Ok(DataType::Varchar(self.read_u32()?)),
9 => Ok(DataType::Char(self.read_u32()?)),
10 => {
let precision = self.read_u8()?;
let scale = self.read_u8()?;
Ok(DataType::Numeric { precision, scale })
}
11 => Ok(DataType::Date),
12 => Ok(DataType::Timestamp),
13 => Ok(DataType::Json),
14 => Ok(DataType::Vector {
dim: self.read_u32()?,
encoding: VecEncoding::Sq8,
}),
15 => Ok(DataType::Vector {
dim: self.read_u32()?,
encoding: VecEncoding::F16,
}),
16 => Ok(DataType::Jsonb),
17 => Ok(DataType::Timestamptz),
18 => Ok(DataType::Bytes),
19 => Ok(DataType::TextArray),
20 => Ok(DataType::IntArray),
21 => Ok(DataType::BigIntArray),
22 => Ok(DataType::TsVector),
23 => Ok(DataType::TsQuery),
24 => Ok(DataType::Uuid),
25 => Ok(DataType::Time),
26 => Ok(DataType::Year),
27 => Ok(DataType::TimeTz),
28 => Ok(DataType::Money),
29 => {
let kt = self.read_u8()?;
let k = RangeKind::from_tag(kt)
.ok_or_else(|| StorageError::Corrupt(format!("unknown RangeKind tag: {kt}")))?;
Ok(DataType::Range(k))
}
30 => Ok(DataType::Hstore),
31 => Ok(DataType::IntArray2D),
32 => Ok(DataType::BigIntArray2D),
33 => Ok(DataType::TextArray2D),
other => Err(StorageError::Corrupt(format!(
"unknown data type tag: {other}"
))),
}
}
}
pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
debug_assert_eq!(
row.values.len(),
schema.columns.len(),
"row_body_encoded_len: row arity must match schema"
);
let bitmap_bytes = schema.columns.len().div_ceil(8);
let mut n = bitmap_bytes;
for (col_idx, v) in row.values.iter().enumerate() {
if matches!(v, Value::Null) {
continue;
}
n += value_body_encoded_len(v, schema.columns[col_idx].ty);
}
n
}
fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
match v {
Value::SmallInt(_) => 2,
Value::Int(_) | Value::Date(_) => 4,
Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
Value::Bool(_) => 1,
Value::Text(s) | Value::Json(s) => {
if s.len() >= STR_LEN_ESCAPE as usize {
6 + s.len()
} else {
2 + s.len()
}
}
Value::Vector(vec) => 4 + 4 * vec.len(),
Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
Value::HalfVector(h) => 4 + h.bytes.len(),
Value::Numeric { .. } => 16 + 1,
Value::Bytes(b) => 2 + b.len(),
Value::TextArray(items) => {
let mut n = 2; for item in items {
n += 1; if let Some(s) = item {
n += 2 + s.len();
}
}
n
}
Value::IntArray(items) => {
2 + items
.iter()
.map(|x| if x.is_some() { 5 } else { 1 })
.sum::<usize>()
}
Value::BigIntArray(items) => {
2 + items
.iter()
.map(|x| if x.is_some() { 9 } else { 1 })
.sum::<usize>()
}
Value::TsVector(lexs) => {
let mut n = 2;
for l in lexs {
n += 2 + l.word.len() + 2 + 2 * l.positions.len() + 1;
}
n
}
Value::TsQuery(ast) => tsquery_encoded_len(ast),
Value::Uuid(_) => 16,
Value::Time(_) => 8,
Value::Year(_) => 2,
Value::TimeTz { .. } => 12,
Value::Money(_) => 8,
Value::Range { lower, upper, .. } => {
1 + lower
.as_ref()
.map(|v| write_value_encoded_len(v))
.unwrap_or(0)
+ upper
.as_ref()
.map(|v| write_value_encoded_len(v))
.unwrap_or(0)
}
Value::Hstore(pairs) => {
let mut n = 4;
for (k, v) in pairs {
n += 4 + k.len() + 1;
if let Some(val) = v {
n += 4 + val.len();
}
}
n
}
Value::IntArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
8 + rows.len() * cols * (1 + 4)
}
Value::BigIntArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
8 + rows.len() * cols * (1 + 8)
}
Value::TextArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
let mut n = 8 + rows.len() * cols;
for row in rows {
for s in row.iter().flatten() {
n += 4 + s.len();
}
}
n
}
Value::Null => 0,
Value::Interval { .. } => {
unreachable!("Value::Interval has no on-disk encoding")
}
}
}
pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
debug_assert_eq!(
row.values.len(),
schema.columns.len(),
"dense encode: row arity must match schema"
);
let bitmap_bytes = schema.columns.len().div_ceil(8);
let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
let bitmap_offset = out.len();
out.resize(bitmap_offset + bitmap_bytes, 0);
for (i, v) in row.values.iter().enumerate() {
if matches!(v, Value::Null) {
out[bitmap_offset + i / 8] |= 1 << (i % 8);
}
}
for (col_idx, v) in row.values.iter().enumerate() {
if matches!(v, Value::Null) {
continue;
}
write_value_body(&mut out, v, schema.columns[col_idx].ty);
}
out
}
pub fn decode_row_body_dense(
bytes: &[u8],
schema: &TableSchema,
codec_version: u8,
) -> Result<(Row, usize), StorageError> {
let mut cur = Cursor::new(bytes).with_codec_version(codec_version);
let bitmap_bytes = schema.columns.len().div_ceil(8);
let mut bitmap_buf = [0u8; 32];
if bitmap_bytes > bitmap_buf.len() {
return Err(StorageError::Corrupt(format!(
"row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
)));
}
let slice = cur.take(bitmap_bytes)?;
bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
let mut values = Vec::with_capacity(schema.columns.len());
for (col_idx, col) in schema.columns.iter().enumerate() {
if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
values.push(Value::Null);
} else {
values.push(cur.read_value_body(col.ty)?);
}
}
Ok((Row { values }, cur.pos))
}
fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
match (v, ty) {
(Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
(Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
(Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
(Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
(Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
(Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
write_str(out, s);
}
(
Value::Vector(v),
DataType::Vector {
encoding: VecEncoding::F32,
..
},
) => {
let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
for x in v {
out.extend_from_slice(&x.to_le_bytes());
}
}
(
Value::Sq8Vector(q),
DataType::Vector {
encoding: VecEncoding::Sq8,
..
},
) => {
let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&q.min.to_le_bytes());
out.extend_from_slice(&q.max.to_le_bytes());
out.extend_from_slice(&q.bytes);
}
(
Value::HalfVector(h),
DataType::Vector {
encoding: VecEncoding::F16,
..
},
) => {
let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&h.bytes);
}
(Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
out.extend_from_slice(&scaled.to_le_bytes());
out.push(scale);
}
(Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
(Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
out.extend_from_slice(&t.to_le_bytes())
}
(Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
(Value::Bytes(b), DataType::Bytes) => write_bytes_escaped(out, b),
(Value::TextArray(items), DataType::TextArray) => {
let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(s) => {
out.push(0);
write_bytes_escaped(out, s.as_bytes());
}
}
}
}
(Value::IntArray(items), DataType::IntArray) => {
let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
(Value::BigIntArray(items), DataType::BigIntArray) => {
let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
(Value::TsVector(lexs), DataType::TsVector) => write_tsvector_body(out, lexs),
(Value::TsQuery(ast), DataType::TsQuery) => write_tsquery_body(out, ast),
(Value::Uuid(b), DataType::Uuid) => out.extend_from_slice(&b[..]),
(Value::Time(us), DataType::Time) => out.extend_from_slice(&us.to_le_bytes()),
(Value::Year(y), DataType::Year) => out.extend_from_slice(&y.to_le_bytes()),
(Value::TimeTz { us, offset_secs }, DataType::TimeTz) => {
out.extend_from_slice(&us.to_le_bytes());
out.extend_from_slice(&offset_secs.to_le_bytes());
}
(Value::Money(c), DataType::Money) => out.extend_from_slice(&c.to_le_bytes()),
(
Value::Range {
lower,
upper,
lower_inc,
upper_inc,
empty,
..
},
DataType::Range(_),
) => {
let mut flags: u8 = 0;
if *empty {
flags |= 0b0000_0001;
}
if lower.is_some() {
flags |= 0b0000_0010;
}
if upper.is_some() {
flags |= 0b0000_0100;
}
if *lower_inc {
flags |= 0b0000_1000;
}
if *upper_inc {
flags |= 0b0001_0000;
}
out.push(flags);
if let Some(l) = lower {
write_value(out, l);
}
if let Some(u) = upper {
write_value(out, u);
}
}
(Value::Hstore(pairs), DataType::Hstore) => write_hstore_body(out, pairs),
(Value::IntArray2D(rows), DataType::IntArray2D) => write_int_2d_body(out, rows),
(Value::BigIntArray2D(rows), DataType::BigIntArray2D) => write_bigint_2d_body(out, rows),
(Value::TextArray2D(rows), DataType::TextArray2D) => write_text_2d_body(out, rows),
(other, ty) => unreachable!(
"schema-driven encode received mismatched value/type pair: \
value tag={:?}, column type={:?}",
other.data_type(),
ty
),
}
}
fn write_value_encoded_len(v: &Value) -> usize {
match v {
Value::Null => 1,
Value::SmallInt(_) => 1 + 2,
Value::Int(_) | Value::Date(_) => 1 + 4,
Value::BigInt(_)
| Value::Float(_)
| Value::Timestamp(_)
| Value::Time(_)
| Value::Money(_) => 1 + 8,
Value::Bool(_) => 1 + 1,
Value::Year(_) => 1 + 2,
Value::Text(s) | Value::Json(s) => 1 + 4 + s.len(),
Value::Bytes(b) => 1 + 4 + b.len(),
Value::Numeric { .. } => 1 + 16 + 1,
Value::Uuid(_) => 1 + 16,
Value::TimeTz { .. } => 1 + 12,
Value::Hstore(pairs) => {
let mut n = 1 + 4;
for (k, v) in pairs {
n += 4 + k.len() + 1;
if let Some(val) = v {
n += 4 + val.len();
}
}
n
}
Value::IntArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
1 + 8 + rows.len() * cols * (1 + 4)
}
Value::BigIntArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
1 + 8 + rows.len() * cols * (1 + 8)
}
Value::TextArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
let mut n = 1 + 8 + rows.len() * cols;
for row in rows {
for s in row.iter().flatten() {
n += 4 + s.len();
}
}
n
}
other => {
let ty = other.data_type().unwrap_or(DataType::Int);
1 + value_body_encoded_len(other, ty)
}
}
}
pub(crate) fn write_value(out: &mut Vec<u8>, v: &Value) {
match v {
Value::Null => out.push(0),
Value::SmallInt(n) => {
out.push(7);
out.extend_from_slice(&n.to_le_bytes());
}
Value::Int(n) => {
out.push(1);
out.extend_from_slice(&n.to_le_bytes());
}
Value::BigInt(n) => {
out.push(2);
out.extend_from_slice(&n.to_le_bytes());
}
Value::Float(x) => {
out.push(3);
out.extend_from_slice(&x.to_le_bytes());
}
Value::Text(s) | Value::Json(s) => {
out.push(4);
write_str(out, s);
}
Value::Bool(b) => {
out.push(5);
out.push(u8::from(*b));
}
Value::Vector(v) => {
out.push(6);
let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
for x in v {
out.extend_from_slice(&x.to_le_bytes());
}
}
Value::Sq8Vector(q) => {
out.push(11);
let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&q.min.to_le_bytes());
out.extend_from_slice(&q.max.to_le_bytes());
out.extend_from_slice(&q.bytes);
}
Value::HalfVector(h) => {
out.push(12);
let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&h.bytes);
}
Value::Numeric { scaled, scale } => {
out.push(8);
out.extend_from_slice(&scaled.to_le_bytes());
out.push(*scale);
}
Value::Date(d) => {
out.push(9);
out.extend_from_slice(&d.to_le_bytes());
}
Value::Timestamp(t) => {
out.push(10);
out.extend_from_slice(&t.to_le_bytes());
}
Value::Interval { .. } => {
unreachable!(
"Value::Interval has no on-disk encoding; engine must reject it before write"
)
}
Value::Bytes(b) => {
out.push(14);
write_bytes_escaped(out, b);
}
Value::TextArray(items) => {
out.push(15);
let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(s) => {
out.push(0);
write_bytes_escaped(out, s.as_bytes());
}
}
}
}
Value::IntArray(items) => {
out.push(16);
let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
Value::BigIntArray(items) => {
out.push(17);
let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
Value::TsVector(lexs) => {
out.push(18);
write_tsvector_body(out, lexs);
}
Value::TsQuery(ast) => {
out.push(19);
write_tsquery_body(out, ast);
}
Value::Uuid(b) => {
out.push(20);
out.extend_from_slice(&b[..]);
}
Value::Time(us) => {
out.push(21);
out.extend_from_slice(&us.to_le_bytes());
}
Value::Year(y) => {
out.push(22);
out.extend_from_slice(&y.to_le_bytes());
}
Value::TimeTz { us, offset_secs } => {
out.push(23);
out.extend_from_slice(&us.to_le_bytes());
out.extend_from_slice(&offset_secs.to_le_bytes());
}
Value::Money(c) => {
out.push(24);
out.extend_from_slice(&c.to_le_bytes());
}
Value::Range {
kind,
lower,
upper,
lower_inc,
upper_inc,
empty,
} => {
out.push(25);
out.push(kind.tag());
let mut flags: u8 = 0;
if *empty {
flags |= 0b0000_0001;
}
if lower.is_some() {
flags |= 0b0000_0010;
}
if upper.is_some() {
flags |= 0b0000_0100;
}
if *lower_inc {
flags |= 0b0000_1000;
}
if *upper_inc {
flags |= 0b0001_0000;
}
out.push(flags);
if let Some(l) = lower {
write_value(out, l);
}
if let Some(u) = upper {
write_value(out, u);
}
}
Value::Hstore(pairs) => {
out.push(26);
write_hstore_body(out, pairs);
}
Value::IntArray2D(rows) => {
out.push(27);
write_int_2d_body(out, rows);
}
Value::BigIntArray2D(rows) => {
out.push(28);
write_bigint_2d_body(out, rows);
}
Value::TextArray2D(rows) => {
out.push(29);
write_text_2d_body(out, rows);
}
}
}
fn write_int_2d_body(out: &mut Vec<u8>, rows: &[Vec<Option<i32>>]) {
let nrows = u32::try_from(rows.len()).expect("≤ 4G rows");
let ncols = u32::try_from(rows.first().map(|r| r.len()).unwrap_or(0)).expect("≤ 4G cols");
out.extend_from_slice(&nrows.to_le_bytes());
out.extend_from_slice(&ncols.to_le_bytes());
for row in rows {
for cell in row {
match cell {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
}
fn write_bigint_2d_body(out: &mut Vec<u8>, rows: &[Vec<Option<i64>>]) {
let nrows = u32::try_from(rows.len()).expect("≤ 4G rows");
let ncols = u32::try_from(rows.first().map(|r| r.len()).unwrap_or(0)).expect("≤ 4G cols");
out.extend_from_slice(&nrows.to_le_bytes());
out.extend_from_slice(&ncols.to_le_bytes());
for row in rows {
for cell in row {
match cell {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
}
fn write_text_2d_body(out: &mut Vec<u8>, rows: &[Vec<Option<String>>]) {
let nrows = u32::try_from(rows.len()).expect("≤ 4G rows");
let ncols = u32::try_from(rows.first().map(|r| r.len()).unwrap_or(0)).expect("≤ 4G cols");
out.extend_from_slice(&nrows.to_le_bytes());
out.extend_from_slice(&ncols.to_le_bytes());
for row in rows {
for cell in row {
match cell {
None => out.push(1),
Some(s) => {
out.push(0);
let l = u32::try_from(s.len()).expect("≤ 4 GiB cell");
out.extend_from_slice(&l.to_le_bytes());
out.extend_from_slice(s.as_bytes());
}
}
}
}
}
fn write_hstore_body(out: &mut Vec<u8>, pairs: &[(String, Option<String>)]) {
let count = u32::try_from(pairs.len()).expect("hstore ≤ u32::MAX pairs");
out.extend_from_slice(&count.to_le_bytes());
for (k, v) in pairs {
let klen = u32::try_from(k.len()).expect("hstore key ≤ 4 GiB");
out.extend_from_slice(&klen.to_le_bytes());
out.extend_from_slice(k.as_bytes());
match v {
None => out.push(0),
Some(val) => {
out.push(1);
let vlen = u32::try_from(val.len()).expect("hstore val ≤ 4 GiB");
out.extend_from_slice(&vlen.to_le_bytes());
out.extend_from_slice(val.as_bytes());
}
}
}
}
fn write_tsvector_body(out: &mut Vec<u8>, lexs: &[TsLexeme]) {
let count = u16::try_from(lexs.len()).expect("tsvector ≤ 65k lexemes");
out.extend_from_slice(&count.to_le_bytes());
for l in lexs {
write_bytes_escaped(out, l.word.as_bytes());
let plen = u16::try_from(l.positions.len()).expect("tsvector pos count ≤ 65k");
out.extend_from_slice(&plen.to_le_bytes());
for p in &l.positions {
out.extend_from_slice(&p.to_le_bytes());
}
out.push(l.weight);
}
}
fn write_tsquery_body(out: &mut Vec<u8>, ast: &TsQueryAst) {
match ast {
TsQueryAst::Term { word, weight_mask } => {
out.push(0);
write_bytes_escaped(out, word.as_bytes());
out.push(*weight_mask);
}
TsQueryAst::And(a, b) => {
out.push(1);
write_tsquery_body(out, a);
write_tsquery_body(out, b);
}
TsQueryAst::Or(a, b) => {
out.push(2);
write_tsquery_body(out, a);
write_tsquery_body(out, b);
}
TsQueryAst::Not(x) => {
out.push(3);
write_tsquery_body(out, x);
}
TsQueryAst::Phrase {
left,
right,
distance,
} => {
out.push(4);
out.extend_from_slice(&distance.to_le_bytes());
write_tsquery_body(out, left);
write_tsquery_body(out, right);
}
}
}
fn tsquery_encoded_len(ast: &TsQueryAst) -> usize {
match ast {
TsQueryAst::Term { word, .. } => 1 + 2 + word.len() + 1,
TsQueryAst::And(a, b) | TsQueryAst::Or(a, b) => {
1 + tsquery_encoded_len(a) + tsquery_encoded_len(b)
}
TsQueryAst::Not(x) => 1 + tsquery_encoded_len(x),
TsQueryAst::Phrase { left, right, .. } => {
1 + 2 + tsquery_encoded_len(left) + tsquery_encoded_len(right)
}
}
}
pub(crate) fn write_u16(out: &mut Vec<u8>, n: u16) {
out.extend_from_slice(&n.to_le_bytes());
}
pub(crate) fn write_u32(out: &mut Vec<u8>, n: u32) {
out.extend_from_slice(&n.to_le_bytes());
}
pub(crate) const STR_LEN_ESCAPE: u16 = u16::MAX;
fn write_bytes_escaped(out: &mut Vec<u8>, b: &[u8]) {
if b.len() >= STR_LEN_ESCAPE as usize {
let len = u32::try_from(b.len()).expect("cell fits in u32 (4 GiB cap)");
write_u16(out, STR_LEN_ESCAPE);
write_u32(out, len);
} else {
write_u16(out, b.len() as u16);
}
out.extend_from_slice(b);
}
pub(crate) fn write_str(out: &mut Vec<u8>, s: &str) {
if s.len() >= STR_LEN_ESCAPE as usize {
let len = u32::try_from(s.len()).expect("text fits in u32 (4 GiB cap)");
write_u16(out, STR_LEN_ESCAPE);
write_u32(out, len);
} else {
write_u16(out, s.len() as u16);
}
out.extend_from_slice(s.as_bytes());
}
pub(crate) fn write_str_long(out: &mut Vec<u8>, s: &str) {
let len = u32::try_from(s.len()).expect("function body fits in u32");
write_u32(out, len);
out.extend_from_slice(s.as_bytes());
}
pub(crate) fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
match key {
IndexKey::Int(n) => {
out.push(INDEX_KEY_TAG_INT);
out.extend_from_slice(&n.to_le_bytes());
}
IndexKey::Text(s) => {
out.push(INDEX_KEY_TAG_TEXT);
write_str(out, s);
}
IndexKey::Bool(b) => {
out.push(INDEX_KEY_TAG_BOOL);
out.push(u8::from(*b));
}
IndexKey::Uuid(b) => {
out.push(INDEX_KEY_TAG_UUID);
out.extend_from_slice(&b[..]);
}
}
}
pub(crate) struct Cursor<'a> {
buf: &'a [u8],
pub(crate) pos: usize,
pub(crate) codec_version: u8,
}
impl<'a> Cursor<'a> {
pub(crate) const fn new(buf: &'a [u8]) -> Self {
Self {
buf,
pos: 0,
codec_version: 0,
}
}
pub(crate) const fn with_codec_version(mut self, v: u8) -> Self {
self.codec_version = v;
self
}
pub(crate) fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
let end = self
.pos
.checked_add(n)
.ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
if end > self.buf.len() {
return Err(StorageError::Corrupt(format!(
"unexpected EOF at offset {} (wanted {n} more bytes)",
self.pos
)));
}
let s = &self.buf[self.pos..end];
self.pos = end;
Ok(s)
}
pub(crate) fn read_u8(&mut self) -> Result<u8, StorageError> {
Ok(self.take(1)?[0])
}
pub(crate) fn read_u16(&mut self) -> Result<u16, StorageError> {
let s = self.take(2)?;
Ok(u16::from_le_bytes([s[0], s[1]]))
}
pub(crate) fn read_u32(&mut self) -> Result<u32, StorageError> {
let s = self.take(4)?;
Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
}
pub(crate) fn read_i32(&mut self) -> Result<i32, StorageError> {
let s = self.take(4)?;
Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
}
pub(crate) fn read_u64(&mut self) -> Result<u64, StorageError> {
let s = self.take(8)?;
Ok(u64::from_le_bytes([
s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
]))
}
pub(crate) fn read_i64(&mut self) -> Result<i64, StorageError> {
let s = self.take(8)?;
let arr: [u8; 8] = s.try_into().expect("checked");
Ok(i64::from_le_bytes(arr))
}
pub(crate) fn read_f64(&mut self) -> Result<f64, StorageError> {
let s = self.take(8)?;
let arr: [u8; 8] = s.try_into().expect("checked");
Ok(f64::from_le_bytes(arr))
}
pub(crate) fn read_f32(&mut self) -> Result<f32, StorageError> {
let s = self.take(4)?;
Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
}
pub(crate) fn read_len_escaped_v47(&mut self) -> Result<usize, StorageError> {
let short = self.read_u16()?;
if self.codec_version >= 47 && short == STR_LEN_ESCAPE {
Ok(self.read_u32()? as usize)
} else {
Ok(short as usize)
}
}
pub(crate) fn read_str_escaped_v47(&mut self) -> Result<String, StorageError> {
let len = self.read_len_escaped_v47()?;
let bytes = self.take(len)?;
core::str::from_utf8(bytes)
.map(String::from)
.map_err(|_| StorageError::Corrupt("invalid UTF-8 in cell payload".into()))
}
pub(crate) fn read_str(&mut self) -> Result<String, StorageError> {
let short = self.read_u16()?;
let len = if self.codec_version >= 46 && short == STR_LEN_ESCAPE {
self.read_u32()? as usize
} else {
short as usize
};
let bytes = self.take(len)?;
core::str::from_utf8(bytes)
.map(String::from)
.map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
}
pub(crate) fn read_str_long(&mut self) -> Result<String, StorageError> {
let len = self.read_u32()? as usize;
let bytes = self.take(len)?;
core::str::from_utf8(bytes)
.map(String::from)
.map_err(|_| StorageError::Corrupt("invalid UTF-8 in long-string payload".into()))
}
pub(crate) fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
let tag = self.read_u8()?;
match tag {
INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
INDEX_KEY_TAG_UUID => {
let s = self.take(16)?;
let mut b = [0u8; 16];
b.copy_from_slice(s);
Ok(IndexKey::Uuid(b))
}
other => Err(StorageError::Corrupt(format!(
"unknown index key tag: {other}"
))),
}
}
pub(crate) fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
match ty {
DataType::SmallInt => {
let s = self.take(2)?;
Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
}
DataType::Int => Ok(Value::Int(self.read_i32()?)),
DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
DataType::Float => Ok(Value::Float(self.read_f64()?)),
DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
Ok(Value::Text(self.read_str()?))
}
DataType::Vector {
encoding: VecEncoding::F32,
..
} => {
let dim = self.read_u32()? as usize;
let mut v = Vec::with_capacity(dim);
for _ in 0..dim {
let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
v.push(f32::from_le_bytes(bytes));
}
Ok(Value::Vector(v))
}
DataType::Vector {
encoding: VecEncoding::Sq8,
..
} => {
let dim = self.read_u32()? as usize;
let min = self.read_f32()?;
let max = self.read_f32()?;
let bytes = self.take(dim)?.to_vec();
Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
}
DataType::Vector {
encoding: VecEncoding::F16,
..
} => {
let dim = self.read_u32()? as usize;
let bytes = self.take(dim * 2)?.to_vec();
Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
}
DataType::Numeric { .. } => {
let s = self.take(16)?;
let arr: [u8; 16] = s.try_into().expect("checked");
let scaled = i128::from_le_bytes(arr);
let scale = self.read_u8()?;
Ok(Value::Numeric { scaled, scale })
}
DataType::Date => Ok(Value::Date(self.read_i32()?)),
DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
DataType::Interval => {
Err(StorageError::Corrupt(
"INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
))
}
DataType::Json => Ok(Value::Json(self.read_str()?)),
DataType::Bytes => {
let len = self.read_len_escaped_v47()?;
let bytes = self.take(len)?.to_vec();
Ok(Value::Bytes(bytes))
}
DataType::TextArray => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<String>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_str_escaped_v47()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"TEXT[] null flag: unknown byte {other}"
)));
}
}
}
Ok(Value::TextArray(items))
}
DataType::IntArray => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_i32()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"INT[] null flag: unknown byte {other}"
)));
}
}
}
Ok(Value::IntArray(items))
}
DataType::BigIntArray => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_i64()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"BIGINT[] null flag: unknown byte {other}"
)));
}
}
}
Ok(Value::BigIntArray(items))
}
DataType::TsVector => Ok(Value::TsVector(self.read_tsvector_body()?)),
DataType::TsQuery => Ok(Value::TsQuery(self.read_tsquery_body()?)),
DataType::Uuid => {
let s = self.take(16)?;
let mut b = [0u8; 16];
b.copy_from_slice(s);
Ok(Value::Uuid(b))
}
DataType::Time => Ok(Value::Time(self.read_i64()?)),
DataType::Year => Ok(Value::Year(self.read_u16()?)),
DataType::TimeTz => {
let us = self.read_i64()?;
let offset_secs = self.read_i32()?;
Ok(Value::TimeTz { us, offset_secs })
}
DataType::Money => Ok(Value::Money(self.read_i64()?)),
DataType::Hstore => Ok(Value::Hstore(self.read_hstore_body()?)),
DataType::IntArray2D => Ok(Value::IntArray2D(self.read_int_2d_body()?)),
DataType::BigIntArray2D => Ok(Value::BigIntArray2D(self.read_bigint_2d_body()?)),
DataType::TextArray2D => Ok(Value::TextArray2D(self.read_text_2d_body()?)),
DataType::Range(kind) => {
let flags = self.read_u8()?;
let empty = flags & 0b0000_0001 != 0;
let has_lower = flags & 0b0000_0010 != 0;
let has_upper = flags & 0b0000_0100 != 0;
let lower_inc = flags & 0b0000_1000 != 0;
let upper_inc = flags & 0b0001_0000 != 0;
let lower = if has_lower {
Some(alloc::boxed::Box::new(self.read_value()?))
} else {
None
};
let upper = if has_upper {
Some(alloc::boxed::Box::new(self.read_value()?))
} else {
None
};
Ok(Value::Range {
kind,
lower,
upper,
lower_inc,
upper_inc,
empty,
})
}
}
}
pub(crate) fn read_int_2d_body(&mut self) -> Result<Vec<Vec<Option<i32>>>, StorageError> {
let nrows = self.read_u32()? as usize;
let ncols = self.read_u32()? as usize;
let mut rows = Vec::with_capacity(nrows);
for _ in 0..nrows {
let mut row = Vec::with_capacity(ncols);
for _ in 0..ncols {
let null = self.read_u8()?;
row.push(if null == 1 {
None
} else {
Some(self.read_i32()?)
});
}
rows.push(row);
}
Ok(rows)
}
pub(crate) fn read_bigint_2d_body(&mut self) -> Result<Vec<Vec<Option<i64>>>, StorageError> {
let nrows = self.read_u32()? as usize;
let ncols = self.read_u32()? as usize;
let mut rows = Vec::with_capacity(nrows);
for _ in 0..nrows {
let mut row = Vec::with_capacity(ncols);
for _ in 0..ncols {
let null = self.read_u8()?;
row.push(if null == 1 {
None
} else {
Some(self.read_i64()?)
});
}
rows.push(row);
}
Ok(rows)
}
pub(crate) fn read_text_2d_body(&mut self) -> Result<Vec<Vec<Option<String>>>, StorageError> {
let nrows = self.read_u32()? as usize;
let ncols = self.read_u32()? as usize;
let mut rows = Vec::with_capacity(nrows);
for _ in 0..nrows {
let mut row = Vec::with_capacity(ncols);
for _ in 0..ncols {
let null = self.read_u8()?;
if null == 1 {
row.push(None);
} else {
let l = self.read_u32()? as usize;
let bytes = self.take(l)?.to_vec();
let s = String::from_utf8(bytes).map_err(|_| {
StorageError::Corrupt("2D TEXT cell is not valid UTF-8".into())
})?;
row.push(Some(s));
}
}
rows.push(row);
}
Ok(rows)
}
pub(crate) fn read_hstore_body(
&mut self,
) -> Result<Vec<(String, Option<String>)>, StorageError> {
let count = self.read_u32()? as usize;
let mut out = Vec::with_capacity(count);
for _ in 0..count {
let klen = self.read_u32()? as usize;
let k_bytes = self.take(klen)?.to_vec();
let k = String::from_utf8(k_bytes)
.map_err(|_| StorageError::Corrupt("hstore key is not valid UTF-8".into()))?;
let has_val = self.read_u8()? != 0;
let v =
if has_val {
let vlen = self.read_u32()? as usize;
let v_bytes = self.take(vlen)?.to_vec();
Some(String::from_utf8(v_bytes).map_err(|_| {
StorageError::Corrupt("hstore value is not valid UTF-8".into())
})?)
} else {
None
};
out.push((k, v));
}
Ok(out)
}
pub(crate) fn read_tsvector_body(&mut self) -> Result<Vec<TsLexeme>, StorageError> {
let count = self.read_u16()? as usize;
let mut out = Vec::with_capacity(count);
for _ in 0..count {
let word = self.read_str_escaped_v47()?;
let pos_count = self.read_u16()? as usize;
let mut positions = Vec::with_capacity(pos_count);
for _ in 0..pos_count {
positions.push(self.read_u16()?);
}
let weight = self.read_u8()?;
out.push(TsLexeme {
word,
positions,
weight,
});
}
Ok(out)
}
pub(crate) fn read_tsquery_body(&mut self) -> Result<TsQueryAst, StorageError> {
let tag = self.read_u8()?;
match tag {
0 => {
let word = self.read_str_escaped_v47()?;
let weight_mask = self.read_u8()?;
Ok(TsQueryAst::Term { word, weight_mask })
}
1 => {
let a = self.read_tsquery_body()?;
let b = self.read_tsquery_body()?;
Ok(TsQueryAst::And(Box::new(a), Box::new(b)))
}
2 => {
let a = self.read_tsquery_body()?;
let b = self.read_tsquery_body()?;
Ok(TsQueryAst::Or(Box::new(a), Box::new(b)))
}
3 => {
let x = self.read_tsquery_body()?;
Ok(TsQueryAst::Not(Box::new(x)))
}
4 => {
let distance = self.read_u16()?;
let left = self.read_tsquery_body()?;
let right = self.read_tsquery_body()?;
Ok(TsQueryAst::Phrase {
left: Box::new(left),
right: Box::new(right),
distance,
})
}
other => Err(StorageError::Corrupt(format!(
"tsquery: unknown node tag {other}"
))),
}
}
pub(crate) fn read_value(&mut self) -> Result<Value, StorageError> {
let tag = self.read_u8()?;
match tag {
0 => Ok(Value::Null),
1 => Ok(Value::Int(self.read_i32()?)),
2 => Ok(Value::BigInt(self.read_i64()?)),
3 => Ok(Value::Float(self.read_f64()?)),
4 => Ok(Value::Text(self.read_str()?)),
5 => Ok(Value::Bool(self.read_u8()? != 0)),
6 => {
let dim = self.read_u32()? as usize;
let mut v = Vec::with_capacity(dim);
for _ in 0..dim {
let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
v.push(f32::from_le_bytes(bytes));
}
Ok(Value::Vector(v))
}
7 => {
let s = self.take(2)?;
Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
}
8 => {
let s = self.take(16)?;
let arr: [u8; 16] = s.try_into().expect("checked");
let scaled = i128::from_le_bytes(arr);
let scale = self.read_u8()?;
Ok(Value::Numeric { scaled, scale })
}
9 => Ok(Value::Date(self.read_i32()?)),
10 => Ok(Value::Timestamp(self.read_i64()?)),
11 => {
let dim = self.read_u32()? as usize;
let min = self.read_f32()?;
let max = self.read_f32()?;
let bytes = self.take(dim)?.to_vec();
Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
}
12 => {
let dim = self.read_u32()? as usize;
let bytes = self.take(dim * 2)?.to_vec();
Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
}
14 => {
let len = self.read_len_escaped_v47()?;
let bytes = self.take(len)?.to_vec();
Ok(Value::Bytes(bytes))
}
15 => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<String>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_str_escaped_v47()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"TEXT[] null flag in value tag: unknown byte {other}"
)));
}
}
}
Ok(Value::TextArray(items))
}
16 => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_i32()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"INT[] null flag in value tag: unknown byte {other}"
)));
}
}
}
Ok(Value::IntArray(items))
}
17 => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_i64()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"BIGINT[] null flag in value tag: unknown byte {other}"
)));
}
}
}
Ok(Value::BigIntArray(items))
}
18 => Ok(Value::TsVector(self.read_tsvector_body()?)),
19 => Ok(Value::TsQuery(self.read_tsquery_body()?)),
20 => {
let s = self.take(16)?;
let mut b = [0u8; 16];
b.copy_from_slice(s);
Ok(Value::Uuid(b))
}
21 => Ok(Value::Time(self.read_i64()?)),
22 => Ok(Value::Year(self.read_u16()?)),
23 => {
let us = self.read_i64()?;
let offset_secs = self.read_i32()?;
Ok(Value::TimeTz { us, offset_secs })
}
24 => Ok(Value::Money(self.read_i64()?)),
26 => Ok(Value::Hstore(self.read_hstore_body()?)),
27 => Ok(Value::IntArray2D(self.read_int_2d_body()?)),
28 => Ok(Value::BigIntArray2D(self.read_bigint_2d_body()?)),
29 => Ok(Value::TextArray2D(self.read_text_2d_body()?)),
25 => {
let kt = self.read_u8()?;
let kind = RangeKind::from_tag(kt)
.ok_or_else(|| StorageError::Corrupt(format!("unknown RangeKind tag: {kt}")))?;
let flags = self.read_u8()?;
let empty = flags & 0b0000_0001 != 0;
let has_lower = flags & 0b0000_0010 != 0;
let has_upper = flags & 0b0000_0100 != 0;
let lower_inc = flags & 0b0000_1000 != 0;
let upper_inc = flags & 0b0001_0000 != 0;
let lower = if has_lower {
Some(alloc::boxed::Box::new(self.read_value()?))
} else {
None
};
let upper = if has_upper {
Some(alloc::boxed::Box::new(self.read_value()?))
} else {
None
};
Ok(Value::Range {
kind,
lower,
upper,
lower_inc,
upper_inc,
empty,
})
}
other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
}
}
pub(crate) fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
let m_max_0 = self.read_u16()? as usize;
let entry_raw = self.read_u32()?;
let entry = if entry_raw == u32::MAX {
None
} else {
Some(entry_raw as usize)
};
let entry_level = self.read_u8()?;
let node_count = self.read_u32()? as usize;
let mut levels: PersistentVec<u8> = PersistentVec::new();
for _ in 0..node_count {
levels.push_mut(self.read_u8()?);
}
let layer_count = self.read_u8()? as usize;
let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
for _ in 0..layer_count {
let n = self.read_u32()? as usize;
let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
for _ in 0..n {
let cnt = self.read_u16()? as usize;
let mut row: Vec<u32> = Vec::with_capacity(cnt);
for _ in 0..cnt {
row.push(self.read_u32()?);
}
per_layer.push_mut(row);
}
layers.push(per_layer);
}
Ok(NswGraph {
m,
m_max_0,
entry,
entry_level,
levels,
layers,
})
}
}