1use bytes::Bytes;
4
5use crate::{Error, Result, SemanticMeta};
6
7pub type SchemaId = u32;
9
10#[repr(C, align(64))]
12#[derive(Debug, Clone)]
13pub struct Frame {
14 pub header: FrameHeader,
16 pub payload: Bytes,
18 pub semantics: Option<SemanticMeta>,
20}
21
22#[repr(C)]
24#[derive(Debug, Clone, Copy)]
25pub struct FrameHeader {
26 pub version: u8,
28 pub flags: FrameFlags,
30 pub sequence: u64,
32 pub length: u32,
34 pub schema_id: u32, pub checksum: u32, }
39
40bitflags::bitflags! {
41 #[repr(transparent)]
43 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
44 pub struct FrameFlags: u16 {
45 const COMPRESSED = 0b0000_0001;
47 const ENCRYPTED = 0b0000_0010;
49 const CHUNKED = 0b0000_0100;
51 const FINAL = 0b0000_1000;
53 const SCHEMA = 0b0001_0000;
55 const SIMD_HINT = 0b0010_0000;
57 const NUMERIC = 0b0100_0000;
59 const CHECKSUM = 0b1000_0000;
61 }
62}
63
64impl Frame {
65 pub fn new(payload: Bytes) -> Self {
67 Self {
68 header: FrameHeader {
69 version: 1,
70 flags: FrameFlags::empty(),
71 sequence: 0,
72 length: payload.len() as u32,
73 schema_id: 0,
74 checksum: 0,
75 },
76 payload,
77 semantics: None,
78 }
79 }
80
81 pub fn with_semantics(payload: Bytes, semantics: SemanticMeta) -> Self {
83 let mut frame = Self::new(payload);
84 frame.semantics = Some(semantics);
85 frame.header.flags |= FrameFlags::SIMD_HINT;
86 frame
87 }
88
89 pub fn with_sequence(mut self, sequence: u64) -> Self {
91 self.header.sequence = sequence;
92 self
93 }
94
95 pub fn with_schema(mut self, schema_id: SchemaId) -> Self {
97 self.header.schema_id = schema_id;
98 self.header.flags |= FrameFlags::SCHEMA;
99 self
100 }
101
102 pub fn with_compression(mut self) -> Self {
104 self.header.flags |= FrameFlags::COMPRESSED;
105 self
106 }
107
108 pub fn with_checksum(mut self) -> Self {
110 self.header.checksum = crc32c(&self.payload);
111 self.header.flags |= FrameFlags::CHECKSUM;
112 self
113 }
114
115 pub fn validate(&self) -> Result<()> {
117 if self.header.version != 1 {
119 return Err(Error::invalid_frame(format!(
120 "Unsupported version: {}",
121 self.header.version
122 )));
123 }
124
125 if self.header.length != self.payload.len() as u32 {
127 return Err(Error::invalid_frame(format!(
128 "Length mismatch: header={}, payload={}",
129 self.header.length,
130 self.payload.len()
131 )));
132 }
133
134 if self.header.flags.contains(FrameFlags::CHECKSUM) {
136 let actual = crc32c(&self.payload);
137 if actual != self.header.checksum {
138 return Err(Error::invalid_frame(format!(
139 "Checksum mismatch: expected={:08x}, actual={:08x}",
140 self.header.checksum, actual
141 )));
142 }
143 }
144
145 Ok(())
146 }
147
148 pub fn is_numeric(&self) -> bool {
152 self.header.flags.contains(FrameFlags::NUMERIC)
153 }
154
155 pub fn has_semantics(&self) -> bool {
157 self.header.flags.contains(FrameFlags::SIMD_HINT)
158 }
159}
160
161impl FrameHeader {
162 pub const SIZE: usize = std::mem::size_of::<Self>();
164}
165
166fn crc32c(data: &[u8]) -> u32 {
168 crc32c_sw(data)
170}
171
172fn crc32c_sw(data: &[u8]) -> u32 {
174 const CRC32C_POLY: u32 = 0x82F63B78;
175 let mut crc = !0u32;
176
177 for &byte in data {
178 crc ^= u32::from(byte);
179 for _ in 0..8 {
180 crc = if crc & 1 == 1 {
181 (crc >> 1) ^ CRC32C_POLY
182 } else {
183 crc >> 1
184 };
185 }
186 }
187
188 !crc
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 #[test]
196 fn test_frame_creation() {
197 let payload = Bytes::from_static(b"Hello, PJS!");
198 let frame = Frame::new(payload.clone());
199
200 assert_eq!(frame.header.version, 1);
201 assert_eq!(frame.header.length, payload.len() as u32);
202 assert_eq!(frame.payload, payload);
203 }
204
205 #[test]
208 fn test_checksum_validation() {
209 let payload = Bytes::from_static(b"checksum test");
210 let frame = Frame::new(payload).with_checksum();
211
212 frame.validate().unwrap();
213
214 let mut bad_frame = frame.clone();
216 bad_frame.payload = Bytes::from_static(b"corrupted data");
217
218 assert!(bad_frame.validate().is_err());
219 }
220}