use crate::buffer::{SBE_HEADER_SIZE, SbeReader};
use crate::error::{SbeError, SbeResult};
#[derive(Debug, Clone, Copy)]
pub struct SbeHeader {
pub message_length: u32,
pub template_id: u16,
pub schema_version: u16,
}
impl SbeHeader {
pub fn from_bytes(data: &[u8]) -> SbeResult<Self> {
if data.len() < SBE_HEADER_SIZE {
return Err(SbeError::BufferTooSmall {
need: SBE_HEADER_SIZE,
have: data.len(),
});
}
let reader = SbeReader::new(data);
let message_length = reader.read_u32(0)?;
let template_id = reader.read_u16(4)?;
let schema_version = reader.read_u16(6)?;
Ok(Self {
message_length,
template_id,
schema_version,
})
}
pub fn body_length(&self) -> usize {
self.message_length.saturating_sub(SBE_HEADER_SIZE as u32) as usize
}
}
pub struct SbeDecoder<'a> {
reader: SbeReader<'a>,
header: SbeHeader,
body_offset: usize,
}
impl<'a> SbeDecoder<'a> {
pub fn new(data: &'a [u8]) -> SbeResult<Self> {
let header = SbeHeader::from_bytes(data)?;
if header.message_length as usize > data.len() {
return Err(SbeError::InvalidMessageLength {
length: header.message_length as u16,
});
}
let reader = SbeReader::new(&data[..header.message_length as usize]);
Ok(Self {
reader,
header,
body_offset: SBE_HEADER_SIZE,
})
}
pub fn header(&self) -> &SbeHeader {
&self.header
}
pub fn template_id(&self) -> u16 {
self.header.template_id
}
pub fn schema_version(&self) -> u16 {
self.header.schema_version
}
pub fn verify_template_id(&self, expected: u16) -> SbeResult<()> {
if self.header.template_id != expected {
return Err(SbeError::InvalidTemplateId {
expected,
found: self.header.template_id,
});
}
Ok(())
}
pub fn verify_schema_version(&self, expected: u16) -> SbeResult<()> {
if self.header.schema_version != expected {
return Err(SbeError::InvalidSchemaVersion {
expected,
found: self.header.schema_version,
});
}
Ok(())
}
pub fn read_u8(&self, offset: usize) -> SbeResult<u8> {
self.reader.read_u8(self.body_offset + offset)
}
pub fn read_u16(&self, offset: usize) -> SbeResult<u16> {
self.reader.read_u16(self.body_offset + offset)
}
pub fn read_u32(&self, offset: usize) -> SbeResult<u32> {
self.reader.read_u32(self.body_offset + offset)
}
pub fn read_u64(&self, offset: usize) -> SbeResult<u64> {
self.reader.read_u64(self.body_offset + offset)
}
pub fn read_f32(&self, offset: usize) -> SbeResult<f32> {
self.reader.read_f32(self.body_offset + offset)
}
pub fn read_bytes(&self, offset: usize, length: usize) -> SbeResult<&'a [u8]> {
self.reader.read_bytes_at(self.body_offset + offset, length)
}
pub fn read_string(&self, offset: usize, length: usize) -> SbeResult<&'a str> {
self.reader
.read_string_at(self.body_offset + offset, length)
}
pub fn read_array_bytes(
&self,
offset: usize,
element_size: usize,
count: usize,
) -> SbeResult<&'a [u8]> {
let total_size = element_size
.checked_mul(count)
.ok_or(SbeError::IntegerOverflow)?;
self.read_bytes(offset, total_size)
}
pub fn read_group(&self, offset: usize) -> SbeResult<SbeGroupIterator<'a>> {
let reader_ref: &'a SbeReader<'a> = unsafe {
std::mem::transmute(&self.reader)
};
SbeGroupIterator::new(reader_ref, self.body_offset + offset)
}
pub fn reader(&self) -> &SbeReader<'a> {
&self.reader
}
pub fn body_offset(&self) -> usize {
self.body_offset
}
}
pub struct SbeGroupIterator<'a> {
reader: &'a SbeReader<'a>,
start_offset: usize,
element_count: u32,
block_length: u16,
current_index: u32,
current_offset: usize,
}
impl<'a> SbeGroupIterator<'a> {
pub fn new(reader: &'a SbeReader<'a>, offset: usize) -> SbeResult<Self> {
let element_count = reader.read_u32(offset)?;
let block_length = reader.read_u16(offset + 4)?;
if element_count > 10_000_000 {
return Err(SbeError::GroupCountTooLarge {
count: element_count,
});
}
if block_length == 0 {
return Err(SbeError::InvalidGroupBlockLength {
length: block_length,
});
}
let group_header_size = 6;
Ok(Self {
reader,
start_offset: offset,
element_count,
block_length,
current_index: 0,
current_offset: offset + group_header_size,
})
}
pub fn count(&self) -> u32 {
self.element_count
}
pub fn block_length(&self) -> u16 {
self.block_length
}
pub fn current_index(&self) -> u32 {
self.current_index
}
pub fn has_next(&self) -> bool {
self.current_index < self.element_count
}
pub fn next_element(&mut self) -> SbeResult<Option<SbeGroupElement<'a>>> {
if !self.has_next() {
return Ok(None);
}
let element = SbeGroupElement::new(self.reader, self.current_offset, self.block_length)?;
self.current_index += 1;
self.current_offset += self.block_length as usize;
Ok(Some(element))
}
pub fn seek_to(&mut self, index: u32) -> SbeResult<()> {
if index >= self.element_count {
return Err(SbeError::FieldOffsetOutOfBounds {
offset: index as usize,
length: self.element_count as usize,
});
}
self.current_index = index;
self.current_offset = self.start_offset + 6 + (index as usize * self.block_length as usize);
Ok(())
}
pub fn total_size(&self) -> usize {
6 + (self.element_count as usize * self.block_length as usize)
}
}
#[allow(dead_code)]
pub struct SbeGroupElement<'a> {
reader: &'a SbeReader<'a>,
offset: usize,
block_length: u16,
}
impl<'a> SbeGroupElement<'a> {
pub fn new(reader: &'a SbeReader<'a>, offset: usize, block_length: u16) -> SbeResult<Self> {
let end_offset = offset
.checked_add(block_length as usize)
.ok_or(SbeError::IntegerOverflow)?;
if end_offset > reader.len() {
return Err(SbeError::FieldOffsetOutOfBounds {
offset,
length: reader.len(),
});
}
Ok(Self {
reader,
offset,
block_length,
})
}
pub fn read_u8(&self, field_offset: usize) -> SbeResult<u8> {
self.reader.read_u8(self.offset + field_offset)
}
pub fn read_u16(&self, field_offset: usize) -> SbeResult<u16> {
self.reader.read_u16(self.offset + field_offset)
}
pub fn read_u32(&self, field_offset: usize) -> SbeResult<u32> {
self.reader.read_u32(self.offset + field_offset)
}
pub fn read_u64(&self, field_offset: usize) -> SbeResult<u64> {
self.reader.read_u64(self.offset + field_offset)
}
pub fn read_f32(&self, field_offset: usize) -> SbeResult<f32> {
self.reader.read_f32(self.offset + field_offset)
}
pub fn read_string(&self, field_offset: usize, length: usize) -> SbeResult<&'a str> {
self.reader
.read_string_at(self.offset + field_offset, length)
}
pub fn read_bytes(&self, field_offset: usize, length: usize) -> SbeResult<&'a [u8]> {
self.reader
.read_bytes_at(self.offset + field_offset, length)
}
pub fn read_nested_group(&self, field_offset: usize) -> SbeResult<SbeGroupIterator<'a>> {
SbeGroupIterator::new(self.reader, self.offset + field_offset)
}
}
pub struct SbeVariableData<'a> {
data: &'a [u8],
offset: usize,
}
impl<'a> SbeVariableData<'a> {
pub fn new(decoder: &SbeDecoder<'a>, start_offset: usize) -> Self {
Self {
data: decoder.reader.data,
offset: decoder.body_offset + start_offset,
}
}
pub fn read_next_string(&mut self) -> SbeResult<&'a str> {
if self.offset + 2 > self.data.len() {
return Err(SbeError::InvalidVariableDataOffset {
offset: self.offset,
});
}
let length =
u16::from_le_bytes([self.data[self.offset], self.data[self.offset + 1]]) as usize;
let data_start = self.offset + 2;
let data_end = data_start + length;
if data_end > self.data.len() {
return Err(SbeError::InvalidVariableDataOffset { offset: data_end });
}
let string_data = &self.data[data_start..data_end];
let result = std::str::from_utf8(string_data).map_err(|_| SbeError::InvalidUtf8String)?;
self.offset = data_end;
Ok(result)
}
pub fn read_next_bytes(&mut self) -> SbeResult<&'a [u8]> {
if self.offset + 2 > self.data.len() {
return Err(SbeError::InvalidVariableDataOffset {
offset: self.offset,
});
}
let length =
u16::from_le_bytes([self.data[self.offset], self.data[self.offset + 1]]) as usize;
let data_start = self.offset + 2;
let data_end = data_start + length;
if data_end > self.data.len() {
return Err(SbeError::InvalidVariableDataOffset { offset: data_end });
}
let result = &self.data[data_start..data_end];
self.offset = data_end;
Ok(result)
}
pub fn has_more(&self) -> bool {
self.offset < self.data.len()
}
pub fn offset(&self) -> usize {
self.offset
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::buffer::SbeBuffer;
#[test]
fn test_header_parsing() {
let mut buffer = SbeBuffer::new();
buffer.write_bytes(&100u32.to_le_bytes()).unwrap();
buffer.write_bytes(&1u16.to_le_bytes()).unwrap();
buffer.write_bytes(&0u16.to_le_bytes()).unwrap();
let header = SbeHeader::from_bytes(buffer.as_slice()).unwrap();
assert_eq!(header.message_length, 100);
assert_eq!(header.template_id, 1);
assert_eq!(header.schema_version, 0);
}
#[test]
fn test_decoder_creation() {
let mut buffer = SbeBuffer::new();
buffer.write_bytes(&16u32.to_le_bytes()).unwrap(); buffer.write_bytes(&1u16.to_le_bytes()).unwrap(); buffer.write_bytes(&0u16.to_le_bytes()).unwrap(); buffer.write_bytes(&[0u8; 8]).unwrap();
let decoder = SbeDecoder::new(buffer.as_slice()).unwrap();
assert_eq!(decoder.template_id(), 1);
assert_eq!(decoder.schema_version(), 0);
}
#[test]
fn test_field_reading() {
let mut buffer = SbeBuffer::new();
buffer.write_bytes(&20u32.to_le_bytes()).unwrap();
buffer.write_bytes(&1u16.to_le_bytes()).unwrap();
buffer.write_bytes(&0u16.to_le_bytes()).unwrap();
buffer.write_bytes(&42u32.to_le_bytes()).unwrap();
buffer.write_bytes(&1234u64.to_le_bytes()).unwrap();
let decoder = SbeDecoder::new(buffer.as_slice()).unwrap();
assert_eq!(decoder.read_u32(0).unwrap(), 42);
assert_eq!(decoder.read_u64(4).unwrap(), 1234);
}
}