pjson_rs/
frame.rs

1//! Frame format and utilities for PJS protocol
2
3use bytes::Bytes;
4
5use crate::{Error, Result, SemanticMeta};
6
7/// Unique identifier for schemas
8pub type SchemaId = u32;
9
10/// Zero-copy frame structure optimized for cache-line alignment
11#[repr(C, align(64))]
12#[derive(Debug, Clone)]
13pub struct Frame {
14    /// Frame header with metadata
15    pub header: FrameHeader,
16    /// Payload as zero-copy bytes
17    pub payload: Bytes,
18    /// Optional semantic annotations for optimization hints
19    pub semantics: Option<SemanticMeta>,
20}
21
22/// Frame header for wire format
23#[repr(C)]
24#[derive(Debug, Clone, Copy)]
25pub struct FrameHeader {
26    /// Protocol version (currently 1)
27    pub version: u8,
28    /// Frame type and processing flags
29    pub flags: FrameFlags,
30    /// Sequence number for ordering and deduplication
31    pub sequence: u64,
32    /// Payload length in bytes
33    pub length: u32,
34    /// Optional schema ID for validation
35    pub schema_id: u32, // 0 means no schema
36    /// CRC32C checksum of payload (optional)
37    pub checksum: u32, // 0 means no checksum
38}
39
40bitflags::bitflags! {
41    /// Frame processing flags
42    #[repr(transparent)]
43    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
44    pub struct FrameFlags: u16 {
45        /// Payload is compressed
46        const COMPRESSED = 0b0000_0001;
47        /// Payload is encrypted
48        const ENCRYPTED  = 0b0000_0010;
49        /// Frame is part of chunked sequence
50        const CHUNKED    = 0b0000_0100;
51        /// Final frame in sequence
52        const FINAL      = 0b0000_1000;
53        /// Schema validation required
54        const SCHEMA     = 0b0001_0000;
55        /// Contains semantic hints for SIMD optimization
56        const SIMD_HINT  = 0b0010_0000;
57        /// Payload contains numeric array data
58        const NUMERIC    = 0b0100_0000;
59        /// Checksum present
60        const CHECKSUM   = 0b1000_0000;
61    }
62}
63
64impl Frame {
65    /// Create a new frame with given payload
66    pub fn new(payload: Bytes) -> Self {
67        Self {
68            header: FrameHeader {
69                version: 1,
70                flags: FrameFlags::empty(),
71                sequence: 0,
72                length: payload.len() as u32,
73                schema_id: 0,
74                checksum: 0,
75            },
76            payload,
77            semantics: None,
78        }
79    }
80
81    /// Create frame with semantic hints for optimization
82    pub fn with_semantics(payload: Bytes, semantics: SemanticMeta) -> Self {
83        let mut frame = Self::new(payload);
84        frame.semantics = Some(semantics);
85        frame.header.flags |= FrameFlags::SIMD_HINT;
86        frame
87    }
88
89    /// Set sequence number
90    pub fn with_sequence(mut self, sequence: u64) -> Self {
91        self.header.sequence = sequence;
92        self
93    }
94
95    /// Set schema ID for validation
96    pub fn with_schema(mut self, schema_id: SchemaId) -> Self {
97        self.header.schema_id = schema_id;
98        self.header.flags |= FrameFlags::SCHEMA;
99        self
100    }
101
102    /// Enable compression
103    pub fn with_compression(mut self) -> Self {
104        self.header.flags |= FrameFlags::COMPRESSED;
105        self
106    }
107
108    /// Calculate and set checksum
109    pub fn with_checksum(mut self) -> Self {
110        self.header.checksum = crc32c(&self.payload);
111        self.header.flags |= FrameFlags::CHECKSUM;
112        self
113    }
114
115    /// Validate frame integrity
116    pub fn validate(&self) -> Result<()> {
117        // Check version
118        if self.header.version != 1 {
119            return Err(Error::invalid_frame(format!(
120                "Unsupported version: {}",
121                self.header.version
122            )));
123        }
124
125        // Check length
126        if self.header.length != self.payload.len() as u32 {
127            return Err(Error::invalid_frame(format!(
128                "Length mismatch: header={}, payload={}",
129                self.header.length,
130                self.payload.len()
131            )));
132        }
133
134        // Verify checksum if present
135        if self.header.flags.contains(FrameFlags::CHECKSUM) {
136            let actual = crc32c(&self.payload);
137            if actual != self.header.checksum {
138                return Err(Error::invalid_frame(format!(
139                    "Checksum mismatch: expected={:08x}, actual={:08x}",
140                    self.header.checksum, actual
141                )));
142            }
143        }
144
145        Ok(())
146    }
147
148    // TODO: Implement serialization methods later when needed
149
150    /// Check if frame contains numeric array data
151    pub fn is_numeric(&self) -> bool {
152        self.header.flags.contains(FrameFlags::NUMERIC)
153    }
154
155    /// Check if frame has semantic hints
156    pub fn has_semantics(&self) -> bool {
157        self.header.flags.contains(FrameFlags::SIMD_HINT)
158    }
159}
160
161impl FrameHeader {
162    /// Header size in bytes
163    pub const SIZE: usize = std::mem::size_of::<Self>();
164}
165
166/// Fast CRC32C implementation for checksums
167fn crc32c(data: &[u8]) -> u32 {
168    // TODO: Use hardware CRC32C if available (SSE4.2)
169    crc32c_sw(data)
170}
171
172/// Software fallback CRC32C
173fn crc32c_sw(data: &[u8]) -> u32 {
174    const CRC32C_POLY: u32 = 0x82F63B78;
175    let mut crc = !0u32;
176
177    for &byte in data {
178        crc ^= u32::from(byte);
179        for _ in 0..8 {
180            crc = if crc & 1 == 1 {
181                (crc >> 1) ^ CRC32C_POLY
182            } else {
183                crc >> 1
184            };
185        }
186    }
187
188    !crc
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    #[test]
196    fn test_frame_creation() {
197        let payload = Bytes::from_static(b"Hello, PJS!");
198        let frame = Frame::new(payload.clone());
199
200        assert_eq!(frame.header.version, 1);
201        assert_eq!(frame.header.length, payload.len() as u32);
202        assert_eq!(frame.payload, payload);
203    }
204
205    // Serialization tests will be added when serialization is implemented
206
207    #[test]
208    fn test_checksum_validation() {
209        let payload = Bytes::from_static(b"checksum test");
210        let frame = Frame::new(payload).with_checksum();
211
212        frame.validate().unwrap();
213
214        // Corrupt payload should fail validation
215        let mut bad_frame = frame.clone();
216        bad_frame.payload = Bytes::from_static(b"corrupted data");
217
218        assert!(bad_frame.validate().is_err());
219    }
220}