use bytes::{BufMut, BytesMut};
use crate::codec::write_utf16_string;
use crate::prelude::*;
pub const TVP_TYPE_ID: u8 = 0xF3;
pub const TVP_END_TOKEN: u8 = 0x00;
pub const TVP_ROW_TOKEN: u8 = 0x01;
pub const TVP_NULL_TOKEN: u16 = 0xFFFF;
pub const DEFAULT_COLLATION: [u8; 5] = [0x09, 0x04, 0xD0, 0x00, 0x34];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum TvpWireType {
Bit,
Int {
size: u8,
},
Float {
size: u8,
},
Decimal {
precision: u8,
scale: u8,
},
NVarChar {
max_length: u16,
},
VarChar {
max_length: u16,
},
VarBinary {
max_length: u16,
},
Guid,
Date,
Time {
scale: u8,
},
DateTime2 {
scale: u8,
},
DateTimeOffset {
scale: u8,
},
Money,
SmallMoney,
DateTime,
SmallDateTime,
Xml,
}
impl TvpWireType {
#[must_use]
pub const fn type_id(&self) -> u8 {
match self {
Self::Bit => 0x68, Self::Int { .. } => 0x26, Self::Float { .. } => 0x6D, Self::Decimal { .. } => 0x6C, Self::NVarChar { .. } => 0xE7, Self::VarChar { .. } => 0xA7, Self::VarBinary { .. } => 0xA5, Self::Guid => 0x24, Self::Date => 0x28, Self::Time { .. } => 0x29, Self::DateTime2 { .. } => 0x2A, Self::DateTimeOffset { .. } => 0x2B, Self::Money | Self::SmallMoney => 0x6E, Self::DateTime | Self::SmallDateTime => 0x6F, Self::Xml => 0xF1, }
}
pub fn encode_type_info(&self, buf: &mut BytesMut) {
buf.put_u8(self.type_id());
match self {
Self::Bit => {
buf.put_u8(1); }
Self::Int { size } | Self::Float { size } => {
buf.put_u8(*size);
}
Self::Decimal { precision, scale } => {
buf.put_u8(17); buf.put_u8(*precision);
buf.put_u8(*scale);
}
Self::NVarChar { max_length } => {
buf.put_u16_le(*max_length);
buf.put_slice(&DEFAULT_COLLATION);
}
Self::VarChar { max_length } => {
buf.put_u16_le(*max_length);
buf.put_slice(&DEFAULT_COLLATION);
}
Self::VarBinary { max_length } => {
buf.put_u16_le(*max_length);
}
Self::Guid => {
buf.put_u8(16); }
Self::Date => {
}
Self::Time { scale } | Self::DateTime2 { scale } | Self::DateTimeOffset { scale } => {
buf.put_u8(*scale);
}
Self::Money | Self::DateTime => {
buf.put_u8(8); }
Self::SmallMoney | Self::SmallDateTime => {
buf.put_u8(4); }
Self::Xml => {
buf.put_u8(0); }
}
}
}
#[derive(Debug, Clone, Copy, Default)]
#[non_exhaustive]
pub struct TvpColumnFlags {
pub nullable: bool,
}
impl TvpColumnFlags {
#[must_use]
pub const fn new(nullable: bool) -> Self {
Self { nullable }
}
#[must_use]
pub const fn to_bits(&self) -> u16 {
let mut flags = 0u16;
if self.nullable {
flags |= 0x0001;
}
flags
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct TvpColumnDef {
pub wire_type: TvpWireType,
pub flags: TvpColumnFlags,
}
impl TvpColumnDef {
#[must_use]
pub const fn new(wire_type: TvpWireType) -> Self {
Self {
wire_type,
flags: TvpColumnFlags { nullable: false },
}
}
#[must_use]
pub const fn nullable(wire_type: TvpWireType) -> Self {
Self {
wire_type,
flags: TvpColumnFlags { nullable: true },
}
}
pub fn encode(&self, buf: &mut BytesMut) {
buf.put_u32_le(0);
buf.put_u16_le(self.flags.to_bits());
self.wire_type.encode_type_info(buf);
buf.put_u8(0);
}
}
#[derive(Debug)]
pub struct TvpEncoder<'a> {
pub schema: &'a str,
pub type_name: &'a str,
pub columns: &'a [TvpColumnDef],
}
impl<'a> TvpEncoder<'a> {
#[must_use]
pub const fn new(schema: &'a str, type_name: &'a str, columns: &'a [TvpColumnDef]) -> Self {
Self {
schema,
type_name,
columns,
}
}
pub fn encode_metadata(&self, buf: &mut BytesMut) {
buf.put_u8(TVP_TYPE_ID);
buf.put_u8(0);
let schema_len = self.schema.encode_utf16().count() as u8;
buf.put_u8(schema_len);
if schema_len > 0 {
write_utf16_string(buf, self.schema);
}
let type_len = self.type_name.encode_utf16().count() as u8;
buf.put_u8(type_len);
if type_len > 0 {
write_utf16_string(buf, self.type_name);
}
if self.columns.is_empty() {
buf.put_u16_le(TVP_NULL_TOKEN);
} else {
buf.put_u16_le(self.columns.len() as u16);
for col in self.columns {
col.encode(buf);
}
}
buf.put_u8(TVP_END_TOKEN);
}
pub fn encode_row<F>(&self, buf: &mut BytesMut, encode_values: F)
where
F: FnOnce(&mut BytesMut),
{
buf.put_u8(TVP_ROW_TOKEN);
encode_values(buf);
}
pub fn encode_end(&self, buf: &mut BytesMut) {
buf.put_u8(TVP_END_TOKEN);
}
}
pub fn encode_tvp_null(wire_type: &TvpWireType, buf: &mut BytesMut) {
match wire_type {
TvpWireType::NVarChar { max_length } | TvpWireType::VarChar { max_length } => {
if *max_length == 0xFFFF {
buf.put_u64_le(0xFFFFFFFFFFFFFFFF);
} else {
buf.put_u16_le(0xFFFF);
}
}
TvpWireType::VarBinary { max_length } => {
if *max_length == 0xFFFF {
buf.put_u64_le(0xFFFFFFFFFFFFFFFF);
} else {
buf.put_u16_le(0xFFFF);
}
}
TvpWireType::Xml => {
buf.put_u64_le(0xFFFFFFFFFFFFFFFF);
}
_ => {
buf.put_u8(0);
}
}
}
pub fn encode_tvp_bit(value: bool, buf: &mut BytesMut) {
buf.put_u8(1); buf.put_u8(if value { 1 } else { 0 });
}
pub fn encode_tvp_int(value: i64, size: u8, buf: &mut BytesMut) {
buf.put_u8(size); match size {
1 => buf.put_i8(value as i8),
2 => buf.put_i16_le(value as i16),
4 => buf.put_i32_le(value as i32),
8 => buf.put_i64_le(value),
_ => unreachable!("encode_tvp_int called with invalid size {size}; expected 1, 2, 4, or 8"),
}
}
pub fn encode_tvp_float(value: f64, size: u8, buf: &mut BytesMut) {
buf.put_u8(size); match size {
4 => buf.put_f32_le(value as f32),
8 => buf.put_f64_le(value),
_ => unreachable!("encode_tvp_float called with invalid size {size}; expected 4 or 8"),
}
}
pub fn encode_tvp_nvarchar(value: &str, max_length: u16, buf: &mut BytesMut) {
let utf16: Vec<u16> = value.encode_utf16().collect();
let byte_len = utf16.len() * 2;
if max_length == 0xFFFF {
buf.put_u64_le(byte_len as u64); buf.put_u32_le(byte_len as u32); for code_unit in utf16 {
buf.put_u16_le(code_unit);
}
buf.put_u32_le(0); } else {
buf.put_u16_le(byte_len as u16);
for code_unit in utf16 {
buf.put_u16_le(code_unit);
}
}
}
pub fn encode_tvp_varchar(value: &str, max_length: u16, buf: &mut BytesMut) {
let encoded = crate::collation::encode_str_for_collation(value, None);
let byte_len = encoded.len();
if max_length == 0xFFFF {
buf.put_u64_le(byte_len as u64); buf.put_u32_le(byte_len as u32); buf.put_slice(&encoded);
buf.put_u32_le(0); } else {
buf.put_u16_le(byte_len as u16);
buf.put_slice(&encoded);
}
}
pub fn encode_tvp_varbinary(value: &[u8], max_length: u16, buf: &mut BytesMut) {
if max_length == 0xFFFF {
buf.put_u64_le(value.len() as u64);
buf.put_u32_le(value.len() as u32);
buf.put_slice(value);
buf.put_u32_le(0); } else {
buf.put_u16_le(value.len() as u16);
buf.put_slice(value);
}
}
pub fn encode_tvp_guid(uuid_bytes: &[u8; 16], buf: &mut BytesMut) {
buf.put_u8(16);
buf.put_u8(uuid_bytes[3]);
buf.put_u8(uuid_bytes[2]);
buf.put_u8(uuid_bytes[1]);
buf.put_u8(uuid_bytes[0]);
buf.put_u8(uuid_bytes[5]);
buf.put_u8(uuid_bytes[4]);
buf.put_u8(uuid_bytes[7]);
buf.put_u8(uuid_bytes[6]);
buf.put_slice(&uuid_bytes[8..16]);
}
pub fn encode_tvp_date(days: u32, buf: &mut BytesMut) {
buf.put_u8((days & 0xFF) as u8);
buf.put_u8(((days >> 8) & 0xFF) as u8);
buf.put_u8(((days >> 16) & 0xFF) as u8);
}
pub fn encode_tvp_time(intervals: u64, scale: u8, buf: &mut BytesMut) {
let len = match scale {
0..=2 => 3,
3..=4 => 4,
5..=7 => 5,
_ => 5,
};
buf.put_u8(len);
for i in 0..len {
buf.put_u8((intervals >> (8 * i)) as u8);
}
}
pub fn encode_tvp_datetime2(time_intervals: u64, days: u32, scale: u8, buf: &mut BytesMut) {
let time_len = match scale {
0..=2 => 3,
3..=4 => 4,
5..=7 => 5,
_ => 5,
};
buf.put_u8(time_len + 3);
for i in 0..time_len {
buf.put_u8((time_intervals >> (8 * i)) as u8);
}
buf.put_u8((days & 0xFF) as u8);
buf.put_u8(((days >> 8) & 0xFF) as u8);
buf.put_u8(((days >> 16) & 0xFF) as u8);
}
pub fn encode_tvp_datetimeoffset(
time_intervals: u64,
days: u32,
offset_minutes: i16,
scale: u8,
buf: &mut BytesMut,
) {
let time_len = match scale {
0..=2 => 3,
3..=4 => 4,
5..=7 => 5,
_ => 5,
};
buf.put_u8(time_len + 3 + 2);
for i in 0..time_len {
buf.put_u8((time_intervals >> (8 * i)) as u8);
}
buf.put_u8((days & 0xFF) as u8);
buf.put_u8(((days >> 8) & 0xFF) as u8);
buf.put_u8(((days >> 16) & 0xFF) as u8);
buf.put_i16_le(offset_minutes);
}
pub fn encode_tvp_decimal(sign: u8, mantissa: u128, buf: &mut BytesMut) {
buf.put_u8(17); buf.put_u8(sign);
buf.put_u128_le(mantissa);
}
pub fn encode_tvp_money(scaled: i64, buf: &mut BytesMut) {
buf.put_u8(8); let high = (scaled >> 32) as i32;
let low = (scaled & 0xFFFF_FFFF) as u32;
buf.put_i32_le(high);
buf.put_u32_le(low);
}
pub fn encode_tvp_smallmoney(scaled: i32, buf: &mut BytesMut) {
buf.put_u8(4); buf.put_i32_le(scaled);
}
pub fn encode_tvp_datetime(days: i32, ticks: u32, buf: &mut BytesMut) {
buf.put_u8(8); buf.put_i32_le(days);
buf.put_u32_le(ticks);
}
pub fn encode_tvp_smalldatetime(days: u16, minutes: u16, buf: &mut BytesMut) {
buf.put_u8(4); buf.put_u16_le(days);
buf.put_u16_le(minutes);
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn test_tvp_metadata_encoding() {
let columns = vec![TvpColumnDef::new(TvpWireType::Int { size: 4 })];
let encoder = TvpEncoder::new("dbo", "UserIdList", &columns);
let mut buf = BytesMut::new();
encoder.encode_metadata(&mut buf);
assert_eq!(buf[0], TVP_TYPE_ID);
assert_eq!(buf[1], 0);
}
#[test]
fn test_tvp_column_def_encoding() {
let col = TvpColumnDef::nullable(TvpWireType::Int { size: 4 });
let mut buf = BytesMut::new();
col.encode(&mut buf);
assert!(buf.len() >= 9);
assert_eq!(&buf[0..4], &[0, 0, 0, 0]);
assert_eq!(buf[4], 0x01);
assert_eq!(buf[5], 0x00);
}
#[test]
fn test_tvp_nvarchar_encoding() {
let mut buf = BytesMut::new();
encode_tvp_nvarchar("test", 100, &mut buf);
assert_eq!(buf.len(), 2 + 8);
assert_eq!(buf[0], 8); assert_eq!(buf[1], 0);
}
#[test]
fn test_tvp_int_encoding() {
let mut buf = BytesMut::new();
encode_tvp_int(42, 4, &mut buf);
assert_eq!(buf.len(), 5);
assert_eq!(buf[0], 4);
assert_eq!(buf[1], 42);
}
#[test]
fn test_tvp_money_encoding_matches_rpc_layout() {
let mut buf = BytesMut::new();
encode_tvp_money(123_400, &mut buf);
assert_eq!(buf.len(), 9, "length byte + 8-byte payload");
assert_eq!(buf[0], 8, "MONEYN length byte is 8 for MONEY");
assert_eq!(&buf[1..5], &[0, 0, 0, 0], "high word zero for small value");
assert_eq!(&buf[5..9], &123_400i32.to_le_bytes());
}
#[test]
fn test_tvp_money_encoding_negative_value() {
let mut buf = BytesMut::new();
encode_tvp_money(-12_300, &mut buf);
assert_eq!(buf.len(), 9);
assert_eq!(buf[0], 8);
let high = i32::from_le_bytes(buf[1..5].try_into().unwrap());
let low = u32::from_le_bytes(buf[5..9].try_into().unwrap());
let reconstructed = ((high as i64) << 32) | (low as i64 & 0xFFFF_FFFF);
assert_eq!(reconstructed, -12_300i64);
}
#[test]
fn test_tvp_money_encoding_max_value() {
let mut buf = BytesMut::new();
encode_tvp_money(i64::MAX, &mut buf);
assert_eq!(buf.len(), 9);
let high = i32::from_le_bytes(buf[1..5].try_into().unwrap());
let low = u32::from_le_bytes(buf[5..9].try_into().unwrap());
let reconstructed = ((high as i64) << 32) | (low as i64 & 0xFFFF_FFFF);
assert_eq!(reconstructed, i64::MAX);
}
#[test]
fn test_tvp_smallmoney_encoding() {
let mut buf = BytesMut::new();
encode_tvp_smallmoney(12_345, &mut buf);
assert_eq!(buf.len(), 5, "length byte + 4-byte payload");
assert_eq!(buf[0], 4);
assert_eq!(&buf[1..5], &12_345i32.to_le_bytes());
}
#[test]
fn test_tvp_smallmoney_encoding_negative() {
let mut buf = BytesMut::new();
encode_tvp_smallmoney(-1, &mut buf);
assert_eq!(buf.len(), 5);
assert_eq!(buf[0], 4);
assert_eq!(
i32::from_le_bytes(buf[1..5].try_into().unwrap()),
-1,
"SMALLMONEY wraps as signed 32-bit LE"
);
}
#[test]
fn test_tvp_datetime_encoding() {
let mut buf = BytesMut::new();
encode_tvp_datetime(41_275, 0, &mut buf);
assert_eq!(buf.len(), 9, "length byte + 8-byte payload");
assert_eq!(buf[0], 8);
assert_eq!(&buf[1..5], &41_275i32.to_le_bytes());
assert_eq!(&buf[5..9], &0u32.to_le_bytes());
}
#[test]
fn test_tvp_datetime_encoding_pre_1900() {
let mut buf = BytesMut::new();
encode_tvp_datetime(-1, 0, &mut buf);
assert_eq!(buf.len(), 9);
assert_eq!(
i32::from_le_bytes(buf[1..5].try_into().unwrap()),
-1,
"pre-1900 DATETIME uses negative days"
);
}
#[test]
fn test_tvp_smalldatetime_encoding() {
let mut buf = BytesMut::new();
encode_tvp_smalldatetime(43_830, 0, &mut buf);
assert_eq!(buf.len(), 5, "length byte + 4-byte payload");
assert_eq!(buf[0], 4);
assert_eq!(&buf[1..3], &43_830u16.to_le_bytes());
assert_eq!(&buf[3..5], &0u16.to_le_bytes());
}
#[test]
fn test_tvp_money_type_info_encoding() {
let mut buf = BytesMut::new();
TvpWireType::Money.encode_type_info(&mut buf);
assert_eq!(
&buf[..],
&[0x6E, 8],
"MONEY = MONEYN type_id with max_length 8"
);
}
#[test]
fn test_tvp_smallmoney_type_info_encoding() {
let mut buf = BytesMut::new();
TvpWireType::SmallMoney.encode_type_info(&mut buf);
assert_eq!(
&buf[..],
&[0x6E, 4],
"SMALLMONEY = MONEYN type_id with max_length 4"
);
}
#[test]
fn test_tvp_datetime_type_info_encoding() {
let mut buf = BytesMut::new();
TvpWireType::DateTime.encode_type_info(&mut buf);
assert_eq!(
&buf[..],
&[0x6F, 8],
"DATETIME = DATETIMEN type_id with max_length 8"
);
}
#[test]
fn test_tvp_smalldatetime_type_info_encoding() {
let mut buf = BytesMut::new();
TvpWireType::SmallDateTime.encode_type_info(&mut buf);
assert_eq!(
&buf[..],
&[0x6F, 4],
"SMALLDATETIME = DATETIMEN type_id with max_length 4"
);
}
#[test]
fn test_tvp_null_for_money_is_length_zero() {
let mut buf = BytesMut::new();
encode_tvp_null(&TvpWireType::Money, &mut buf);
assert_eq!(&buf[..], &[0], "MONEYN NULL is a single length-zero byte");
let mut buf = BytesMut::new();
encode_tvp_null(&TvpWireType::SmallDateTime, &mut buf);
assert_eq!(
&buf[..],
&[0],
"DATETIMEN NULL is a single length-zero byte"
);
}
}