use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::{Bytes, BytesMut};
use memchr::memchr;
use std::io::{self, Read};
use std::ops::Range;
use std::str;
use crate::types::Oid;
pub const PARSE_COMPLETE_TAG: u8 = b'1';
pub const BIND_COMPLETE_TAG: u8 = b'2';
pub const CLOSE_COMPLETE_TAG: u8 = b'3';
pub const NOTIFICATION_RESPONSE_TAG: u8 = b'A';
pub const COPY_DONE_TAG: u8 = b'c';
pub const COMMAND_COMPLETE_TAG: u8 = b'C';
pub const COPY_DATA_TAG: u8 = b'd';
pub const DATA_ROW_TAG: u8 = b'D';
pub const ERROR_RESPONSE_TAG: u8 = b'E';
pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
pub const NO_DATA_TAG: u8 = b'n';
pub const NOTICE_RESPONSE_TAG: u8 = b'N';
pub const AUTHENTICATION_TAG: u8 = b'R';
pub const PORTAL_SUSPENDED_TAG: u8 = b's';
pub const PARAMETER_STATUS_TAG: u8 = b'S';
pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
pub const ROW_DESCRIPTION_TAG: u8 = b'T';
pub const READY_FOR_QUERY_TAG: u8 = b'Z';
#[derive(Debug, Copy, Clone)]
pub struct Header {
tag: u8,
len: i32,
}
impl Header {
#[inline]
pub fn parse(buf: &[u8]) -> io::Result<Option<Header>> {
if buf.len() < 5 {
return Ok(None);
}
let tag = buf[0];
let len = BigEndian::read_i32(&buf[1..]);
if len < 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid message length: header length < 4",
));
}
Ok(Some(Header { tag, len }))
}
#[inline]
#[must_use]
pub fn tag(self) -> u8 {
self.tag
}
#[inline]
#[must_use]
pub fn len(self) -> i32 {
self.len
}
#[inline]
#[must_use]
pub fn is_empty(self) -> bool {
self.len == 0
}
}
#[non_exhaustive]
#[derive(Debug)]
pub enum Message {
AuthenticationOk,
AuthenticationCleartextPassword,
AuthenticationMd5Password(AuthenticationMd5PasswordBody),
AuthenticationSasl(AuthenticationSaslBody),
AuthenticationSaslContinue(AuthenticationSaslContinueBody),
AuthenticationSaslFinal(AuthenticationSaslFinalBody),
BackendKeyData(BackendKeyDataBody),
BindComplete,
CloseComplete,
CommandComplete(CommandCompleteBody),
CopyData(CopyDataBody),
CopyDone,
CopyInResponse(CopyInResponseBody),
CopyOutResponse(CopyOutResponseBody),
DataRow(DataRowBody),
EmptyQueryResponse,
ErrorResponse(ErrorResponseBody),
NoData,
NoticeResponse(NoticeResponseBody),
NotificationResponse(NotificationResponseBody),
ParameterDescription(ParameterDescriptionBody),
ParameterStatus(ParameterStatusBody),
ParseComplete,
PortalSuspended,
ReadyForQuery(ReadyForQueryBody),
RowDescription(RowDescriptionBody),
}
impl Message {
#[inline]
pub fn parse(buf: &mut BytesMut) -> io::Result<Option<Message>> {
if buf.len() < 5 {
let to_read = 5 - buf.len();
buf.reserve(to_read);
return Ok(None);
}
let tag = buf[0];
let len = (&buf[1..5]).read_u32::<BigEndian>().unwrap();
if len < 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid message length: parsing u32",
));
}
let Some(total_len) = (len as usize).checked_add(1) else {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("invalid message length: {len} + 1 overflows usize"),
));
};
if buf.len() < total_len {
let to_read = total_len - buf.len();
buf.reserve(to_read);
return Ok(None);
}
let mut buf = Buffer {
bytes: buf.split_to(total_len).freeze(),
idx: 5,
};
let message = match tag {
PARSE_COMPLETE_TAG => Message::ParseComplete,
BIND_COMPLETE_TAG => Message::BindComplete,
CLOSE_COMPLETE_TAG => Message::CloseComplete,
NOTIFICATION_RESPONSE_TAG => {
let process_id = buf.read_i32::<BigEndian>()?;
let channel = buf.read_cstr()?;
let message = buf.read_cstr()?;
Message::NotificationResponse(NotificationResponseBody {
process_id,
channel,
message,
})
}
COPY_DONE_TAG => Message::CopyDone,
COMMAND_COMPLETE_TAG => {
let tag = buf.read_cstr()?;
Message::CommandComplete(CommandCompleteBody { tag })
}
COPY_DATA_TAG => {
let storage = buf.read_all();
Message::CopyData(CopyDataBody { storage })
}
DATA_ROW_TAG => {
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::DataRow(DataRowBody { storage, len })
}
ERROR_RESPONSE_TAG => {
let storage = buf.read_all();
Message::ErrorResponse(ErrorResponseBody { storage })
}
COPY_IN_RESPONSE_TAG => {
let format = buf.read_u8()?;
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::CopyInResponse(CopyInResponseBody {
format,
len,
storage,
})
}
COPY_OUT_RESPONSE_TAG => {
let format = buf.read_u8()?;
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::CopyOutResponse(CopyOutResponseBody {
format,
len,
storage,
})
}
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
BACKEND_KEY_DATA_TAG => {
let process_id = buf.read_i32::<BigEndian>()?;
let secret_key = buf.read_i32::<BigEndian>()?;
Message::BackendKeyData(BackendKeyDataBody {
process_id,
secret_key,
})
}
NO_DATA_TAG => Message::NoData,
NOTICE_RESPONSE_TAG => {
let storage = buf.read_all();
Message::NoticeResponse(NoticeResponseBody { storage })
}
AUTHENTICATION_TAG => match buf.read_i32::<BigEndian>()? {
0 => Message::AuthenticationOk,
3 => Message::AuthenticationCleartextPassword,
5 => {
let mut salt = [0; 4];
buf.read_exact(&mut salt)?;
Message::AuthenticationMd5Password(AuthenticationMd5PasswordBody { salt })
}
10 => {
let storage = buf.read_all();
Message::AuthenticationSasl(AuthenticationSaslBody(storage))
}
11 => {
let storage = buf.read_all();
Message::AuthenticationSaslContinue(AuthenticationSaslContinueBody(storage))
}
12 => {
let storage = buf.read_all();
Message::AuthenticationSaslFinal(AuthenticationSaslFinalBody(storage))
}
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown authentication tag `{tag}`"),
));
}
},
PORTAL_SUSPENDED_TAG => Message::PortalSuspended,
PARAMETER_STATUS_TAG => {
let name = buf.read_cstr()?;
let value = buf.read_cstr()?;
Message::ParameterStatus(ParameterStatusBody { name, value })
}
PARAMETER_DESCRIPTION_TAG => {
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::ParameterDescription(ParameterDescriptionBody { storage, len })
}
ROW_DESCRIPTION_TAG => {
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::RowDescription(RowDescriptionBody { storage, len })
}
READY_FOR_QUERY_TAG => {
let status = buf.read_u8()?;
Message::ReadyForQuery(ReadyForQueryBody { status })
}
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown message tag `{tag}`"),
));
}
};
if !buf.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid message length: expected buffer to be empty",
));
}
Ok(Some(message))
}
}
struct Buffer {
bytes: Bytes,
idx: usize,
}
impl Buffer {
#[inline]
fn slice(&self) -> &[u8] {
&self.bytes[self.idx..]
}
#[inline]
fn is_empty(&self) -> bool {
self.slice().is_empty()
}
#[inline]
fn read_cstr(&mut self) -> io::Result<Bytes> {
match memchr(0, self.slice()) {
Some(pos) => {
let start = self.idx;
let end = start + pos;
let cstr = self.bytes.slice(start..end);
self.idx = end + 1;
Ok(cstr)
}
None => Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF",
)),
}
}
#[inline]
fn read_all(&mut self) -> Bytes {
let buf = self.bytes.slice(self.idx..);
self.idx = self.bytes.len();
buf
}
}
impl Read for Buffer {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = {
let slice = self.slice();
let len = std::cmp::min(slice.len(), buf.len());
buf[..len].copy_from_slice(&slice[..len]);
len
};
self.idx += len;
Ok(len)
}
}
#[derive(Debug)]
pub struct AuthenticationMd5PasswordBody {
salt: [u8; 4],
}
impl AuthenticationMd5PasswordBody {
#[inline]
#[must_use]
pub fn salt(&self) -> [u8; 4] {
self.salt
}
}
#[derive(Debug)]
pub struct AuthenticationSaslBody(Bytes);
impl AuthenticationSaslBody {
#[inline]
pub fn data(&self) -> &[u8] {
&self.0
}
#[inline]
pub fn mechanisms(&self) -> SaslMechanisms<'_> {
SaslMechanisms { buf: &self.0 }
}
}
#[derive(Debug)]
pub struct SaslMechanisms<'a> {
buf: &'a [u8],
}
impl<'a> Iterator for SaslMechanisms<'a> {
type Item = &'a str;
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() || self.buf[0] == 0 {
return None;
}
match memchr(0, self.buf) {
Some(pos) => {
let mechanism = str::from_utf8(&self.buf[..pos]).ok()?;
self.buf = &self.buf[pos + 1..];
Some(mechanism)
}
None => None,
}
}
}
#[derive(Debug)]
pub struct AuthenticationSaslContinueBody(Bytes);
impl AuthenticationSaslContinueBody {
#[inline]
pub fn data(&self) -> &[u8] {
&self.0
}
}
#[derive(Debug)]
pub struct AuthenticationSaslFinalBody(Bytes);
impl AuthenticationSaslFinalBody {
#[inline]
pub fn data(&self) -> &[u8] {
&self.0
}
}
#[derive(Debug)]
pub struct BackendKeyDataBody {
process_id: i32,
secret_key: i32,
}
impl BackendKeyDataBody {
#[inline]
#[must_use]
pub fn process_id(&self) -> i32 {
self.process_id
}
#[inline]
#[must_use]
pub fn secret_key(&self) -> i32 {
self.secret_key
}
}
#[derive(Debug)]
pub struct CommandCompleteBody {
tag: Bytes,
}
impl CommandCompleteBody {
#[inline]
pub fn tag(&self) -> io::Result<&str> {
str::from_utf8(&self.tag).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
}
}
#[derive(Debug)]
pub struct CopyDataBody {
storage: Bytes,
}
impl CopyDataBody {
#[inline]
pub fn data(&self) -> &[u8] {
&self.storage
}
#[inline]
pub fn into_bytes(self) -> Bytes {
self.storage
}
}
#[derive(Debug)]
pub struct CopyInResponseBody {
format: u8,
len: u16,
#[expect(
dead_code,
reason = "owns the backing buffer referenced by `format`/`len`; must live as long as the message"
)]
storage: Bytes,
}
impl CopyInResponseBody {
#[inline]
pub fn format(&self) -> u8 {
self.format
}
#[inline]
pub fn column_count(&self) -> u16 {
self.len
}
}
#[derive(Debug)]
pub struct CopyOutResponseBody {
format: u8,
len: u16,
#[expect(
dead_code,
reason = "owns the backing buffer referenced by `format`/`len`; must live as long as the message"
)]
storage: Bytes,
}
impl CopyOutResponseBody {
#[inline]
pub fn format(&self) -> u8 {
self.format
}
#[inline]
pub fn column_count(&self) -> u16 {
self.len
}
}
#[derive(Debug, Clone)]
pub struct DataRowBody {
storage: Bytes,
len: u16,
}
impl DataRowBody {
#[inline]
pub fn ranges(&self) -> DataRowRanges<'_> {
DataRowRanges {
buf: &self.storage,
len: self.storage.len(),
remaining: self.len,
}
}
#[inline]
pub fn buffer(&self) -> &[u8] {
&self.storage
}
#[inline]
pub fn column_count(&self) -> u16 {
self.len
}
#[inline]
pub fn compute_all_offsets(&self) -> Vec<Option<(usize, usize)>> {
let mut offsets = Vec::with_capacity(self.len as usize);
let mut pos = 0usize;
let buf = &self.storage[..];
for _ in 0..self.len as usize {
if pos + 4 > buf.len() {
break;
}
let len = i32::from_be_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]);
pos += 4;
if len >= 0 {
let len = usize::try_from(len).expect("len >= 0 checked above");
let start = pos;
let end = start.saturating_add(len);
if end > buf.len() {
break;
}
offsets.push(Some((start, end)));
pos = end;
} else {
offsets.push(None);
}
}
offsets
}
#[inline]
pub fn get_column_bytes(&self, idx: usize) -> Option<&[u8]> {
if idx >= self.len as usize {
return None;
}
let mut buf = &self.storage[..];
for _ in 0..idx {
if buf.len() < 4 {
return None;
}
let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
buf = &buf[4..];
if len >= 0 {
let len = usize::try_from(len).expect("len >= 0 checked above");
if buf.len() < len {
return None;
}
buf = &buf[len..];
}
}
if buf.len() < 4 {
return None;
}
let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if len < 0 {
None } else {
let len = usize::try_from(len).expect("len >= 0 checked above");
let data_start = 4;
if buf.len() < data_start + len {
return None;
}
Some(&buf[data_start..data_start + len])
}
}
#[inline]
pub fn is_column_null(&self, idx: usize) -> bool {
if idx >= self.len as usize {
return true;
}
let mut buf = &self.storage[..];
for _ in 0..idx {
if buf.len() < 4 {
return true;
}
let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
buf = &buf[4..];
if len >= 0 {
let len = usize::try_from(len).expect("len >= 0 checked above");
if buf.len() < len {
return true;
}
buf = &buf[len..];
}
}
if buf.len() < 4 {
return true;
}
let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
len < 0
}
}
#[derive(Debug)]
pub struct DataRowRanges<'a> {
buf: &'a [u8],
len: usize,
remaining: u16,
}
impl Iterator for DataRowRanges<'_> {
type Item = io::Result<Option<Range<usize>>>;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return if self.buf.is_empty() {
None
} else {
Some(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid message length: extra data in data row",
)))
};
}
self.remaining -= 1;
if self.buf.len() < 4 {
return Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF reading column length",
)));
}
let len = i32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
self.buf = &self.buf[4..];
if len < 0 {
Some(Ok(None)) } else {
let len = usize::try_from(len).expect("len >= 0 in else branch");
if self.buf.len() < len {
return Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF",
)));
}
let base = self.len - self.buf.len();
self.buf = &self.buf[len..];
Some(Ok(Some(base..base + len)))
}
}
}
#[derive(Debug)]
pub struct ErrorResponseBody {
storage: Bytes,
}
impl ErrorResponseBody {
#[inline]
pub fn fields(&self) -> ErrorFields<'_> {
ErrorFields { buf: &self.storage }
}
}
#[derive(Debug)]
pub struct ErrorFields<'a> {
buf: &'a [u8],
}
impl<'a> Iterator for ErrorFields<'a> {
type Item = io::Result<ErrorField<'a>>;
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
return None;
}
let type_ = self.buf[0];
self.buf = &self.buf[1..];
if type_ == 0 {
return None;
}
match memchr(0, self.buf) {
Some(pos) => {
let value = &self.buf[..pos];
self.buf = &self.buf[pos + 1..];
Some(Ok(ErrorField { type_, value }))
}
None => Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF in error field",
))),
}
}
}
#[derive(Debug)]
pub struct ErrorField<'a> {
type_: u8,
value: &'a [u8],
}
impl ErrorField<'_> {
#[inline]
#[must_use]
pub fn type_(&self) -> u8 {
self.type_
}
#[inline]
#[must_use]
pub fn value_bytes(&self) -> &[u8] {
self.value
}
#[inline]
pub fn value(&self) -> io::Result<&str> {
str::from_utf8(self.value).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
}
}
#[derive(Debug)]
pub struct NoticeResponseBody {
storage: Bytes,
}
impl NoticeResponseBody {
#[inline]
pub fn fields(&self) -> ErrorFields<'_> {
ErrorFields { buf: &self.storage }
}
}
#[derive(Debug)]
pub struct NotificationResponseBody {
process_id: i32,
channel: Bytes,
message: Bytes,
}
impl NotificationResponseBody {
#[inline]
pub fn process_id(&self) -> i32 {
self.process_id
}
#[inline]
pub fn channel(&self) -> io::Result<&str> {
str::from_utf8(&self.channel).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
}
#[inline]
pub fn message(&self) -> io::Result<&str> {
str::from_utf8(&self.message).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
}
}
#[derive(Debug)]
pub struct ParameterDescriptionBody {
storage: Bytes,
len: u16,
}
impl ParameterDescriptionBody {
#[inline]
pub fn parameters(&self) -> Parameters<'_> {
Parameters {
buf: &self.storage,
remaining: self.len,
}
}
}
#[derive(Debug)]
pub struct Parameters<'a> {
buf: &'a [u8],
remaining: u16,
}
impl Iterator for Parameters<'_> {
type Item = io::Result<Oid>;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
self.remaining -= 1;
if self.buf.len() < 4 {
return Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF",
)));
}
let oid = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
self.buf = &self.buf[4..];
Some(Ok(Oid::new(oid)))
}
}
#[derive(Debug)]
pub struct ParameterStatusBody {
name: Bytes,
value: Bytes,
}
impl ParameterStatusBody {
#[inline]
pub fn name(&self) -> io::Result<&str> {
str::from_utf8(&self.name).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
}
#[inline]
pub fn value(&self) -> io::Result<&str> {
str::from_utf8(&self.value).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
}
}
#[derive(Debug)]
pub struct ReadyForQueryBody {
status: u8,
}
impl ReadyForQueryBody {
#[inline]
#[must_use]
pub fn status(&self) -> u8 {
self.status
}
}
#[derive(Debug)]
pub struct RowDescriptionBody {
storage: Bytes,
len: u16,
}
impl RowDescriptionBody {
#[inline]
pub fn fields(&self) -> Fields<'_> {
Fields {
buf: &self.storage,
remaining: self.len,
}
}
#[inline]
pub fn field_count(&self) -> u16 {
self.len
}
}
#[derive(Debug)]
pub struct Fields<'a> {
buf: &'a [u8],
remaining: u16,
}
impl<'a> Iterator for Fields<'a> {
type Item = io::Result<Field<'a>>;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
self.remaining -= 1;
let Some(name_end) = memchr(0, self.buf) else {
return Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF in field name",
)));
};
let name = match str::from_utf8(&self.buf[..name_end]) {
Ok(s) => s,
Err(e) => return Some(Err(io::Error::new(io::ErrorKind::InvalidInput, e))),
};
self.buf = &self.buf[name_end + 1..];
if self.buf.len() < 18 {
return Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF in field description",
)));
}
let table_oid = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
let column_id = i16::from_be_bytes([self.buf[4], self.buf[5]]);
let type_oid = u32::from_be_bytes([self.buf[6], self.buf[7], self.buf[8], self.buf[9]]);
let type_size = i16::from_be_bytes([self.buf[10], self.buf[11]]);
let type_modifier =
i32::from_be_bytes([self.buf[12], self.buf[13], self.buf[14], self.buf[15]]);
let format = i16::from_be_bytes([self.buf[16], self.buf[17]]);
self.buf = &self.buf[18..];
Some(Ok(Field {
name,
table_oid: Oid::new(table_oid),
column_id,
type_oid: Oid::new(type_oid),
type_size,
type_modifier,
format,
}))
}
}
#[derive(Debug)]
pub struct Field<'a> {
name: &'a str,
table_oid: Oid,
column_id: i16,
type_oid: Oid,
type_size: i16,
type_modifier: i32,
format: i16,
}
impl<'a> Field<'a> {
#[inline]
#[must_use]
pub fn name(&self) -> &'a str {
self.name
}
#[inline]
#[must_use]
pub fn table_oid(&self) -> Oid {
self.table_oid
}
#[inline]
#[must_use]
pub fn column_id(&self) -> i16 {
self.column_id
}
#[inline]
#[must_use]
pub fn type_oid(&self) -> Oid {
self.type_oid
}
#[inline]
#[must_use]
pub fn type_size(&self) -> i16 {
self.type_size
}
#[inline]
#[must_use]
pub fn type_modifier(&self) -> i32 {
self.type_modifier
}
#[inline]
#[must_use]
pub fn format(&self) -> i16 {
self.format
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn compute_all_offsets_handles_many_columns() {
let mut buf = Vec::new();
for i in 0u8..40 {
buf.extend_from_slice(&1i32.to_be_bytes());
buf.push(i);
}
let row = DataRowBody {
storage: Bytes::from(buf),
len: 40,
};
let offsets = row.compute_all_offsets();
assert_eq!(offsets.len(), 40);
let expect_range = |idx: usize, val: u8| {
let (start, end) = offsets[idx].expect("expected non-null");
let slice = &row.buffer()[start..end];
assert_eq!(slice, &[val]);
};
expect_range(0, 0);
expect_range(10, 10);
expect_range(39, 39);
}
#[test]
fn compute_all_offsets_tracks_nulls() {
let mut buf = Vec::new();
buf.extend_from_slice(&1i32.to_be_bytes());
buf.push(0xAA);
buf.extend_from_slice(&(-1i32).to_be_bytes()); buf.extend_from_slice(&1i32.to_be_bytes());
buf.push(0xBB);
let row = DataRowBody {
storage: Bytes::from(buf),
len: 3,
};
let offsets = row.compute_all_offsets();
assert_eq!(offsets.len(), 3);
assert!(offsets[0].is_some());
assert!(offsets[1].is_none());
assert!(offsets[2].is_some());
}
}