use bytes::Bytes;
use crate::{Error, Result, SemanticMeta};
pub type SchemaId = u32;
#[repr(C, align(64))]
#[derive(Debug, Clone)]
pub struct Frame {
pub header: FrameHeader,
pub payload: Bytes,
pub semantics: Option<SemanticMeta>,
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct FrameHeader {
pub version: u8,
pub flags: FrameFlags,
pub sequence: u64,
pub length: u32,
pub schema_id: u32, pub checksum: u32, }
bitflags::bitflags! {
#[repr(transparent)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FrameFlags: u16 {
const COMPRESSED = 0b0000_0001;
const ENCRYPTED = 0b0000_0010;
const CHUNKED = 0b0000_0100;
const FINAL = 0b0000_1000;
const SCHEMA = 0b0001_0000;
const SIMD_HINT = 0b0010_0000;
const NUMERIC = 0b0100_0000;
const CHECKSUM = 0b1000_0000;
}
}
impl Frame {
pub fn new(payload: Bytes) -> Self {
Self {
header: FrameHeader {
version: 1,
flags: FrameFlags::empty(),
sequence: 0,
length: payload.len() as u32,
schema_id: 0,
checksum: 0,
},
payload,
semantics: None,
}
}
pub fn with_semantics(payload: Bytes, semantics: SemanticMeta) -> Self {
let mut frame = Self::new(payload);
frame.semantics = Some(semantics);
frame.header.flags |= FrameFlags::SIMD_HINT;
frame
}
pub fn with_sequence(mut self, sequence: u64) -> Self {
self.header.sequence = sequence;
self
}
pub fn with_schema(mut self, schema_id: SchemaId) -> Self {
self.header.schema_id = schema_id;
self.header.flags |= FrameFlags::SCHEMA;
self
}
pub fn with_compression(mut self) -> Self {
self.header.flags |= FrameFlags::COMPRESSED;
self
}
pub fn with_checksum(mut self) -> Self {
self.header.checksum = crc32c(&self.payload);
self.header.flags |= FrameFlags::CHECKSUM;
self
}
pub fn validate(&self) -> Result<()> {
if self.header.version != 1 {
return Err(Error::invalid_frame(format!(
"Unsupported version: {}",
self.header.version
)));
}
if self.header.length != self.payload.len() as u32 {
return Err(Error::invalid_frame(format!(
"Length mismatch: header={}, payload={}",
self.header.length,
self.payload.len()
)));
}
if self.header.flags.contains(FrameFlags::CHECKSUM) {
let actual = crc32c(&self.payload);
if actual != self.header.checksum {
return Err(Error::invalid_frame(format!(
"Checksum mismatch: expected={:08x}, actual={:08x}",
self.header.checksum, actual
)));
}
}
Ok(())
}
pub fn is_numeric(&self) -> bool {
self.header.flags.contains(FrameFlags::NUMERIC)
}
pub fn has_semantics(&self) -> bool {
self.header.flags.contains(FrameFlags::SIMD_HINT)
}
}
impl FrameHeader {
pub const SIZE: usize = std::mem::size_of::<Self>();
}
fn crc32c(data: &[u8]) -> u32 {
crc32c_sw(data)
}
fn crc32c_sw(data: &[u8]) -> u32 {
const CRC32C_POLY: u32 = 0x82F63B78;
let mut crc = !0u32;
for &byte in data {
crc ^= u32::from(byte);
for _ in 0..8 {
crc = if crc & 1 == 1 {
(crc >> 1) ^ CRC32C_POLY
} else {
crc >> 1
};
}
}
!crc
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_frame_creation() {
let payload = Bytes::from_static(b"Hello, PJS!");
let frame = Frame::new(payload.clone());
assert_eq!(frame.header.version, 1);
assert_eq!(frame.header.length, payload.len() as u32);
assert_eq!(frame.payload, payload);
}
#[test]
fn test_checksum_validation() {
let payload = Bytes::from_static(b"checksum test");
let frame = Frame::new(payload).with_checksum();
frame.validate().unwrap();
let mut bad_frame = frame.clone();
bad_frame.payload = Bytes::from_static(b"corrupted data");
assert!(bad_frame.validate().is_err());
}
}