use crate::charset::Charset;
use crate::error::{Error, Result};
use crate::value::{ColumnMeta, Value, align4};
use crate::wire::consts::sql_type;
use crate::wire::stream::FbStream;
pub fn null_bitmap_len(ncols: usize) -> usize {
align4(ncols.div_ceil(8))
}
pub fn message_buffer_len(columns: &[ColumnMeta]) -> u32 {
let mut off: usize = 0;
for col in columns {
let (len, alignment) = type_size_align(col);
off = align_up(off, alignment);
off += len;
off = align_up(off, 2);
off += 2;
}
off as u32
}
#[inline]
fn align_up(n: usize, alignment: usize) -> usize {
(n + alignment - 1) & !(alignment - 1)
}
fn type_size_align(col: &ColumnMeta) -> (usize, usize) {
let n = col.length as usize;
match sql_type::base(col.sql_type) {
sql_type::TEXT => (n, 1),
sql_type::VARYING => (n + 2, 2),
sql_type::SHORT => (2, 2),
sql_type::LONG => (4, 4),
sql_type::FLOAT => (4, 4),
sql_type::TYPE_DATE | sql_type::TYPE_TIME => (4, 4),
sql_type::INT64 => (8, 8),
sql_type::DOUBLE | sql_type::D_FLOAT => (8, 8),
sql_type::TIMESTAMP => (8, 4),
sql_type::BLOB | sql_type::QUAD | sql_type::ARRAY => (8, 4),
sql_type::INT128 => (16, 8),
sql_type::BOOLEAN => (1, 1),
sql_type::DEC16 => (8, 8),
sql_type::DEC34 => (16, 8),
sql_type::TIME_TZ | sql_type::TIME_TZ_EX => (6, 4),
sql_type::TIMESTAMP_TZ | sql_type::TIMESTAMP_TZ_EX => (10, 4),
_ => (8, 8),
}
}
pub fn encode_row(columns: &[ColumnMeta], values: &[Value], charset: Charset) -> Result<Vec<u8>> {
let mut out = Vec::new();
encode_row_into(&mut out, columns, values, charset)?;
Ok(out)
}
pub fn encode_row_into(
out: &mut Vec<u8>,
columns: &[ColumnMeta],
values: &[Value],
charset: Charset,
) -> Result<()> {
if values.len() != columns.len() {
return Err(Error::protocol(format!(
"parameter count mismatch: statement expects {}, got {}",
columns.len(),
values.len()
)));
}
let inicio = out.len();
out.resize(inicio + null_bitmap_len(columns.len()), 0);
for (i, (col, val)) in columns.iter().zip(values).enumerate() {
if val.is_null() {
out[inicio + i / 8] |= 1 << (i % 8);
} else if let Err(e) = encode_value(out, col, val, charset) {
out.truncate(inicio); return Err(e);
}
}
Ok(())
}
fn put_i32_be(out: &mut Vec<u8>, v: i32) {
out.extend_from_slice(&v.to_be_bytes());
}
fn put_pad(out: &mut Vec<u8>, data_len: usize) {
for _ in 0..(align4(data_len) - data_len) {
out.push(0);
}
}
fn encode_value(out: &mut Vec<u8>, col: &ColumnMeta, val: &Value, charset: Charset) -> Result<()> {
let mismatch = || Error::protocol(format!("value does not fit column type {}", col.sql_type));
match sql_type::base(col.sql_type) {
sql_type::SHORT => put_i32_be(out, i32::from(val.as_i64().ok_or_else(mismatch)? as i16)),
sql_type::LONG => put_i32_be(out, val.as_i64().ok_or_else(mismatch)? as i32),
sql_type::INT64 => out.extend_from_slice(&val.as_i64().ok_or_else(mismatch)?.to_be_bytes()),
sql_type::INT128 => match val {
Value::Int128(v) => out.extend_from_slice(&v.to_be_bytes()),
_ => {
out.extend_from_slice(&i128::from(val.as_i64().ok_or_else(mismatch)?).to_be_bytes())
}
},
sql_type::FLOAT => match val {
Value::Float(f) => out.extend_from_slice(&f.to_bits().to_be_bytes()),
Value::Double(f) => out.extend_from_slice(&(*f as f32).to_bits().to_be_bytes()),
_ => return Err(mismatch()),
},
sql_type::DOUBLE | sql_type::D_FLOAT => match val {
Value::Double(f) => out.extend_from_slice(&f.to_bits().to_be_bytes()),
Value::Float(f) => out.extend_from_slice(&(f64::from(*f)).to_bits().to_be_bytes()),
_ => return Err(mismatch()),
},
sql_type::VARYING => {
let bytes = text_bytes(val, charset)?;
put_i32_be(out, bytes.len() as i32);
out.extend_from_slice(&bytes);
put_pad(out, bytes.len());
}
sql_type::TEXT => {
let bytes = text_bytes(val, charset)?;
let n = col.length as usize;
out.extend_from_slice(&bytes);
for _ in bytes.len()..n {
out.push(b' ');
}
put_pad(out, n.max(bytes.len()));
}
sql_type::TYPE_DATE => put_i32_be(out, expect_date(val)?),
sql_type::TYPE_TIME => put_i32_be(out, expect_time(val)? as i32),
sql_type::TIMESTAMP => match val {
Value::Timestamp(d, t) => {
put_i32_be(out, *d);
put_i32_be(out, *t as i32);
}
_ => return Err(mismatch()),
},
sql_type::BOOLEAN => {
out.push(matches!(val, Value::Bool(true)) as u8);
put_pad(out, 1);
}
sql_type::DEC16 => match val {
Value::DecFloat(d) => out.extend_from_slice(&d.to_decimal64().ok_or_else(mismatch)?),
_ => return Err(mismatch()),
},
sql_type::DEC34 => match val {
Value::DecFloat(d) => out.extend_from_slice(&d.to_decimal128().ok_or_else(mismatch)?),
_ => return Err(mismatch()),
},
sql_type::BLOB | sql_type::QUAD => match val {
Value::Blob(id) => out.extend_from_slice(&id.to_be_bytes()),
_ => return Err(mismatch()),
},
sql_type::ARRAY => match val {
Value::Array(id) => out.extend_from_slice(&id.to_be_bytes()),
_ => return Err(mismatch()),
},
sql_type::TIME_TZ | sql_type::TIME_TZ_EX => match val {
Value::TimeTz(t) => {
put_i32_be(out, t.utc_time as i32);
put_i32_be(out, i32::from(t.zone));
}
_ => return Err(mismatch()),
},
sql_type::TIMESTAMP_TZ | sql_type::TIMESTAMP_TZ_EX => match val {
Value::TimestampTz(t) => {
put_i32_be(out, t.utc_date);
put_i32_be(out, t.utc_time as i32);
put_i32_be(out, i32::from(t.zone));
}
_ => return Err(mismatch()),
},
_ => {
return Err(Error::protocol(format!(
"unsupported parameter type {}",
col.sql_type
)));
}
}
Ok(())
}
fn text_bytes(val: &Value, charset: Charset) -> Result<std::borrow::Cow<'_, [u8]>> {
use std::borrow::Cow;
match val {
Value::Text(s) => Ok(Cow::Owned(charset.encode(s))),
Value::Bytes(b) => Ok(Cow::Borrowed(b)),
_ => Err(Error::protocol("expected a text/bytes value")),
}
}
fn expect_date(val: &Value) -> Result<i32> {
match val {
Value::Date(d) => Ok(*d),
Value::Timestamp(d, _) => Ok(*d),
_ => Err(Error::protocol("expected a DATE value")),
}
}
fn expect_time(val: &Value) -> Result<u32> {
match val {
Value::Time(t) => Ok(*t),
Value::Timestamp(_, t) => Ok(*t),
_ => Err(Error::protocol("expected a TIME value")),
}
}
pub fn decode_row(
stream: &mut FbStream,
columns: &[ColumnMeta],
charset: Charset,
) -> Result<Vec<Value>> {
let bitmap = stream.read_raw(null_bitmap_len(columns.len()))?;
let mut values = Vec::with_capacity(columns.len());
for (i, col) in columns.iter().enumerate() {
let is_null = bitmap[i / 8] & (1 << (i % 8)) != 0;
if is_null {
values.push(Value::Null);
} else {
values.push(decode_value(stream, col, charset)?);
}
}
Ok(values)
}
fn decode_value(stream: &mut FbStream, col: &ColumnMeta, charset: Charset) -> Result<Value> {
Ok(match sql_type::base(col.sql_type) {
sql_type::SHORT => Value::Short(stream.read_i32()? as i16),
sql_type::LONG => Value::Int(stream.read_i32()?),
sql_type::INT64 => Value::BigInt(stream.read_i64()?),
sql_type::INT128 => {
let b = stream.read_raw(16)?;
Value::Int128(i128::from_be_bytes(b.try_into().unwrap()))
}
sql_type::DEC16 => {
let b = stream.read_raw(8)?;
Value::DecFloat(crate::decfloat::DecFloat::from_decimal64(
b.try_into().unwrap(),
))
}
sql_type::DEC34 => {
let b = stream.read_raw(16)?;
Value::DecFloat(crate::decfloat::DecFloat::from_decimal128(
b.try_into().unwrap(),
))
}
sql_type::FLOAT => Value::Float(f32::from_bits(stream.read_i32()? as u32)),
sql_type::DOUBLE | sql_type::D_FLOAT => Value::Double(stream.read_f64()?),
sql_type::TEXT => {
let n = col.length as usize;
let raw = stream.read_raw(n)?;
stream.read_pad(n)?;
text_or_bytes(col, raw, charset)
}
sql_type::VARYING => {
let raw = stream.read_bytes()?; text_or_bytes(col, raw, charset)
}
sql_type::TYPE_DATE => Value::Date(stream.read_i32()?),
sql_type::TYPE_TIME => Value::Time(stream.read_i32()? as u32),
sql_type::TIMESTAMP => {
let date = stream.read_i32()?;
let time = stream.read_i32()? as u32;
Value::Timestamp(date, time)
}
sql_type::TIME_TZ | sql_type::TIME_TZ_EX => {
let utc_time = stream.read_i32()? as u32;
let zone = stream.read_i32()? as u16;
let offset = stream.read_i32()? as i16;
Value::TimeTz(crate::value::TimeTz {
utc_time,
zone,
offset,
})
}
sql_type::TIMESTAMP_TZ | sql_type::TIMESTAMP_TZ_EX => {
let utc_date = stream.read_i32()?;
let utc_time = stream.read_i32()? as u32;
let zone = stream.read_i32()? as u16;
let offset = stream.read_i32()? as i16;
Value::TimestampTz(crate::value::TimestampTz {
utc_date,
utc_time,
zone,
offset,
})
}
sql_type::BLOB | sql_type::QUAD => Value::Blob(stream.read_quad()?),
sql_type::ARRAY => Value::Array(stream.read_quad()?),
sql_type::BOOLEAN => {
let b = stream.read_raw(1)?;
stream.read_pad(1)?;
Value::Bool(b[0] != 0)
}
_ => {
let n = col.xdr_len();
Value::Bytes(stream.read_raw(n)?)
}
})
}
fn text_or_bytes(col: &ColumnMeta, raw: Vec<u8>, charset: Charset) -> Value {
const CS_OCTETS: i32 = 1;
if col.sub_type == CS_OCTETS {
Value::Bytes(raw)
} else {
let s = charset.decode(&raw);
if sql_type::base(col.sql_type) == sql_type::TEXT {
Value::Text(s.trim_end_matches(' ').to_string())
} else {
Value::Text(s)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn col(sql_type: i32, length: i32) -> ColumnMeta {
ColumnMeta {
sql_type,
length,
..Default::default()
}
}
#[test]
fn buffer_len_integer_and_varchar() {
let cols = [col(sql_type::LONG, 4), col(sql_type::VARYING, 20)];
assert_eq!(message_buffer_len(&cols), 30);
}
#[test]
fn buffer_len_respects_alignment() {
let cols = [col(sql_type::SHORT, 2), col(sql_type::INT64, 8)];
assert_eq!(message_buffer_len(&cols), 18);
}
#[test]
fn encode_row_is_4_aligned() {
let cols = [col(sql_type::LONG, 4), col(sql_type::VARYING, 20)];
let msg = encode_row(
&cols,
&[Value::Int(1), Value::Text("um".into())],
Charset::Utf8,
)
.unwrap();
assert_eq!(msg.len() % 4, 0);
assert_eq!(msg.len(), 16); }
}