Skip to main content

lnc_network/
protocol.rs

1use lnc_core::{LANCE_MAGIC, LanceError, Result, crc32c};
2use zerocopy::{FromBytes, Immutable, IntoBytes};
3
4pub const PROTOCOL_VERSION: u8 = 1;
5
6#[repr(u8)]
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum LwpFlags {
9    None = 0x00,
10    Compressed = 0x01,
11    Encrypted = 0x02,
12    BatchMode = 0x04,
13    Ack = 0x08,
14    Backpressure = 0x10,
15    Keepalive = 0x20,
16    Control = 0x40,
17}
18
19impl From<u8> for LwpFlags {
20    fn from(value: u8) -> Self {
21        match value {
22            0x01 => Self::Compressed,
23            0x02 => Self::Encrypted,
24            0x04 => Self::BatchMode,
25            0x08 => Self::Ack,
26            0x10 => Self::Backpressure,
27            0x20 => Self::Keepalive,
28            0x40 => Self::Control,
29            _ => Self::None,
30        }
31    }
32}
33
34#[repr(u8)]
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum ControlCommand {
37    // Topic management (0x01-0x0F)
38    CreateTopic = 0x01,
39    DeleteTopic = 0x02,
40    ListTopics = 0x03,
41    GetTopic = 0x04,
42    /// Set retention policy for a topic
43    SetRetention = 0x05,
44    /// Create topic with retention configuration
45    CreateTopicWithRetention = 0x06,
46
47    // Cluster management (0x07-0x0F)
48    /// Get cluster status and health information
49    GetClusterStatus = 0x07,
50    /// Authenticate client with token
51    Authenticate = 0x08,
52    /// Authentication response (success/failure)
53    AuthenticateResponse = 0x09,
54
55    // Data fetch - request/response (0x10-0x1F)
56    Fetch = 0x10,
57    FetchResponse = 0x11,
58    /// Server has not yet replicated to the requested offset — client should backoff and retry
59    CatchingUp = 0x12,
60
61    // Streaming control (0x20-0x2F)
62    /// Subscribe to a topic for streaming - server will push data
63    Subscribe = 0x20,
64    /// Unsubscribe from a topic - stop streaming
65    Unsubscribe = 0x21,
66    /// Commit consumer offset for checkpointing
67    CommitOffset = 0x22,
68    /// Server acknowledgment of subscription
69    SubscribeAck = 0x23,
70    /// Server acknowledgment of offset commit
71    CommitAck = 0x24,
72
73    // Responses (0x80+)
74    TopicResponse = 0x80,
75    /// Cluster status response
76    ClusterStatusResponse = 0x81,
77    ErrorResponse = 0xFF,
78}
79
80impl From<u8> for ControlCommand {
81    fn from(value: u8) -> Self {
82        match value {
83            0x01 => Self::CreateTopic,
84            0x02 => Self::DeleteTopic,
85            0x03 => Self::ListTopics,
86            0x04 => Self::GetTopic,
87            0x05 => Self::SetRetention,
88            0x06 => Self::CreateTopicWithRetention,
89            0x07 => Self::GetClusterStatus,
90            0x08 => Self::Authenticate,
91            0x09 => Self::AuthenticateResponse,
92            0x10 => Self::Fetch,
93            0x11 => Self::FetchResponse,
94            0x12 => Self::CatchingUp,
95            0x20 => Self::Subscribe,
96            0x21 => Self::Unsubscribe,
97            0x22 => Self::CommitOffset,
98            0x23 => Self::SubscribeAck,
99            0x24 => Self::CommitAck,
100            0x80 => Self::TopicResponse,
101            0x81 => Self::ClusterStatusResponse,
102            0xFF => Self::ErrorResponse,
103            _ => Self::ErrorResponse,
104        }
105    }
106}
107
108#[derive(Debug, Clone, Copy)]
109pub struct LwpHeader {
110    pub magic: [u8; 4],
111    pub version: u8,
112    pub flags: u8,
113    pub reserved: [u8; 2],
114    pub header_crc: u32,
115    pub ingest_header: IngestHeader,
116}
117
118impl LwpHeader {
119    pub const SIZE: usize = 44;
120
121    pub fn new(flags: u8, ingest_header: IngestHeader) -> Self {
122        let mut header = Self {
123            magic: LANCE_MAGIC,
124            version: PROTOCOL_VERSION,
125            flags,
126            reserved: [0; 2],
127            header_crc: 0,
128            ingest_header,
129        };
130
131        let mut crc_buf = [0u8; 8];
132        crc_buf[0..4].copy_from_slice(&header.magic);
133        crc_buf[4] = header.version;
134        crc_buf[5] = header.flags;
135        crc_buf[6..8].copy_from_slice(&header.reserved);
136        header.header_crc = crc32c(&crc_buf);
137
138        header
139    }
140
141    pub fn parse(buf: &[u8]) -> Result<Self> {
142        if buf.len() < Self::SIZE {
143            return Err(LanceError::Protocol(
144                "Buffer too small for LWP header".into(),
145            ));
146        }
147
148        if buf[0..4] != LANCE_MAGIC {
149            return Err(LanceError::InvalidMagic);
150        }
151
152        let version = buf[4];
153        if version != PROTOCOL_VERSION {
154            return Err(LanceError::Protocol(format!(
155                "Unsupported protocol version: {}",
156                version
157            )));
158        }
159
160        let stored_crc = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
161        let computed_crc = crc32c(&buf[0..8]);
162
163        if stored_crc != computed_crc {
164            return Err(LanceError::CrcMismatch {
165                expected: stored_crc,
166                actual: computed_crc,
167            });
168        }
169
170        let mut magic = [0u8; 4];
171        magic.copy_from_slice(&buf[0..4]);
172
173        let ingest_header = IngestHeader::parse(&buf[12..Self::SIZE])?;
174
175        Ok(Self {
176            magic,
177            version: buf[4],
178            flags: buf[5],
179            reserved: [buf[6], buf[7]],
180            header_crc: stored_crc,
181            ingest_header,
182        })
183    }
184
185    pub fn encode(&self) -> [u8; Self::SIZE] {
186        let mut buf = [0u8; Self::SIZE];
187        buf[0..4].copy_from_slice(&self.magic);
188        buf[4] = self.version;
189        buf[5] = self.flags;
190        buf[6..8].copy_from_slice(&self.reserved);
191        buf[8..12].copy_from_slice(&self.header_crc.to_le_bytes());
192        self.ingest_header.encode_into(&mut buf[12..Self::SIZE]);
193        buf
194    }
195
196    #[inline]
197    pub fn has_flag(&self, flag: LwpFlags) -> bool {
198        self.flags & (flag as u8) != 0
199    }
200
201    #[inline]
202    pub fn is_keepalive(&self) -> bool {
203        self.has_flag(LwpFlags::Keepalive)
204    }
205
206    #[inline]
207    pub fn is_batch_mode(&self) -> bool {
208        self.has_flag(LwpFlags::BatchMode)
209    }
210
211    #[inline]
212    pub fn is_compressed(&self) -> bool {
213        self.has_flag(LwpFlags::Compressed)
214    }
215
216    #[inline]
217    pub fn is_control(&self) -> bool {
218        self.has_flag(LwpFlags::Control)
219    }
220
221    /// Create a keepalive header
222    pub fn keepalive() -> Self {
223        Self::new(LwpFlags::Keepalive as u8, IngestHeader::default())
224    }
225}
226
227/// Zero-copy ingest header per Architecture §13.3.
228///
229/// `#[repr(C)]` ensures deterministic field layout matching the wire format.
230/// `zerocopy::{FromBytes, IntoBytes}` enables zero-copy casting from network
231/// buffers without per-field deserialization.
232#[repr(C)]
233#[derive(Debug, Clone, Copy, Default, FromBytes, Immutable, IntoBytes)]
234pub struct IngestHeader {
235    pub batch_id: u64,
236    pub timestamp_ns: u64,
237    pub record_count: u32,
238    pub payload_length: u32,
239    pub payload_crc: u32,
240    pub topic_id: u32,
241}
242
243impl IngestHeader {
244    pub const SIZE: usize = 32;
245
246    pub fn new(
247        batch_id: u64,
248        timestamp_ns: u64,
249        record_count: u32,
250        payload_length: u32,
251        payload_crc: u32,
252    ) -> Self {
253        Self {
254            batch_id,
255            timestamp_ns,
256            record_count,
257            payload_length,
258            payload_crc,
259            topic_id: 0,
260        }
261    }
262
263    pub fn with_topic(
264        batch_id: u64,
265        timestamp_ns: u64,
266        record_count: u32,
267        payload_length: u32,
268        payload_crc: u32,
269        topic_id: u32,
270    ) -> Self {
271        Self {
272            batch_id,
273            timestamp_ns,
274            record_count,
275            payload_length,
276            payload_crc,
277            topic_id,
278        }
279    }
280
281    /// Zero-copy parse: reinterpret a byte slice as an `IngestHeader`.
282    ///
283    /// On little-endian architectures this is a simple pointer cast with
284    /// no field-by-field deserialization.  On big-endian it falls back to
285    /// the per-field path (compile-time).
286    pub fn parse(buf: &[u8]) -> Result<Self> {
287        if buf.len() < Self::SIZE {
288            return Err(LanceError::Protocol(
289                "Buffer too small for IngestHeader".into(),
290            ));
291        }
292
293        #[cfg(target_endian = "little")]
294        {
295            // SAFETY: #[repr(C)] layout matches wire format on LE.
296            // zerocopy::FromBytes guarantees the cast is sound if the
297            // buffer is large enough (checked above).
298            if let Ok(header) = Self::read_from_bytes(&buf[..Self::SIZE]) {
299                return Ok(header);
300            }
301        }
302
303        // Fallback: manual field-by-field parse (big-endian or alignment miss)
304        Ok(Self {
305            batch_id: u64::from_le_bytes([
306                buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
307            ]),
308            timestamp_ns: u64::from_le_bytes([
309                buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
310            ]),
311            record_count: u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]),
312            payload_length: u32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]),
313            payload_crc: u32::from_le_bytes([buf[24], buf[25], buf[26], buf[27]]),
314            topic_id: u32::from_le_bytes([buf[28], buf[29], buf[30], buf[31]]),
315        })
316    }
317
318    /// Zero-copy encode: write struct bytes directly into buffer.
319    ///
320    /// On little-endian architectures this is a single memcpy.
321    pub fn encode_into(&self, buf: &mut [u8]) {
322        #[cfg(target_endian = "little")]
323        {
324            let src = self.as_bytes();
325            buf[..Self::SIZE].copy_from_slice(src);
326        }
327
328        #[cfg(target_endian = "big")]
329        {
330            buf[0..8].copy_from_slice(&self.batch_id.to_le_bytes());
331            buf[8..16].copy_from_slice(&self.timestamp_ns.to_le_bytes());
332            buf[16..20].copy_from_slice(&self.record_count.to_le_bytes());
333            buf[20..24].copy_from_slice(&self.payload_length.to_le_bytes());
334            buf[24..28].copy_from_slice(&self.payload_crc.to_le_bytes());
335            buf[28..32].copy_from_slice(&self.topic_id.to_le_bytes());
336        }
337    }
338
339    pub fn validate_payload(&self, payload: &[u8]) -> Result<()> {
340        if payload.len() != self.payload_length as usize {
341            return Err(LanceError::Protocol(format!(
342                "Payload length mismatch: expected {}, got {}",
343                self.payload_length,
344                payload.len()
345            )));
346        }
347
348        let actual_crc = crc32c(payload);
349        if actual_crc != self.payload_crc {
350            lnc_metrics::increment_crc_failures();
351            return Err(LanceError::CrcMismatch {
352                expected: self.payload_crc,
353                actual: actual_crc,
354            });
355        }
356
357        Ok(())
358    }
359}
360
361#[cfg(test)]
362#[allow(clippy::unwrap_used)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn test_lwp_header_roundtrip() {
368        let ingest = IngestHeader::new(12345, 1_000_000_000, 100, 4096, 0xDEADBEEF);
369        let header = LwpHeader::new(LwpFlags::BatchMode as u8, ingest);
370
371        let encoded = header.encode();
372        let parsed = LwpHeader::parse(&encoded).unwrap();
373
374        assert_eq!(parsed.magic, LANCE_MAGIC);
375        assert_eq!(parsed.version, PROTOCOL_VERSION);
376        assert_eq!(parsed.ingest_header.batch_id, 12345);
377        assert_eq!(parsed.ingest_header.record_count, 100);
378    }
379
380    #[test]
381    fn test_invalid_magic() {
382        let mut buf = [0u8; LwpHeader::SIZE];
383        buf[0..4].copy_from_slice(b"JUNK");
384
385        let result = LwpHeader::parse(&buf);
386        assert!(matches!(result, Err(LanceError::InvalidMagic)));
387    }
388
389    #[test]
390    fn test_keepalive() {
391        let header = LwpHeader::keepalive();
392        assert!(header.is_keepalive());
393        assert!(!header.is_batch_mode());
394    }
395
396    #[test]
397    fn test_payload_validation() {
398        let payload = b"test payload data";
399        let crc = crc32c(payload);
400
401        let ingest = IngestHeader::new(1, 0, 1, payload.len() as u32, crc);
402        assert!(ingest.validate_payload(payload).is_ok());
403
404        let wrong_crc_ingest = IngestHeader::new(1, 0, 1, payload.len() as u32, 0xBADBAD);
405        assert!(wrong_crc_ingest.validate_payload(payload).is_err());
406    }
407
408    // =========================================================================
409    // ControlCommand Parsing Tests
410    // =========================================================================
411
412    #[test]
413    fn test_control_command_topic_management() {
414        assert_eq!(ControlCommand::from(0x01), ControlCommand::CreateTopic);
415        assert_eq!(ControlCommand::from(0x02), ControlCommand::DeleteTopic);
416        assert_eq!(ControlCommand::from(0x03), ControlCommand::ListTopics);
417        assert_eq!(ControlCommand::from(0x04), ControlCommand::GetTopic);
418    }
419
420    #[test]
421    fn test_control_command_fetch() {
422        assert_eq!(ControlCommand::from(0x10), ControlCommand::Fetch);
423        assert_eq!(ControlCommand::from(0x11), ControlCommand::FetchResponse);
424    }
425
426    #[test]
427    fn test_control_command_streaming() {
428        assert_eq!(ControlCommand::from(0x20), ControlCommand::Subscribe);
429        assert_eq!(ControlCommand::from(0x21), ControlCommand::Unsubscribe);
430        assert_eq!(ControlCommand::from(0x22), ControlCommand::CommitOffset);
431        assert_eq!(ControlCommand::from(0x23), ControlCommand::SubscribeAck);
432        assert_eq!(ControlCommand::from(0x24), ControlCommand::CommitAck);
433    }
434
435    #[test]
436    fn test_control_command_responses() {
437        assert_eq!(ControlCommand::from(0x80), ControlCommand::TopicResponse);
438        assert_eq!(ControlCommand::from(0xFF), ControlCommand::ErrorResponse);
439    }
440
441    #[test]
442    fn test_control_command_authentication() {
443        assert_eq!(ControlCommand::from(0x08), ControlCommand::Authenticate);
444        assert_eq!(
445            ControlCommand::from(0x09),
446            ControlCommand::AuthenticateResponse
447        );
448        assert_eq!(ControlCommand::Authenticate as u8, 0x08);
449        assert_eq!(ControlCommand::AuthenticateResponse as u8, 0x09);
450    }
451
452    #[test]
453    fn test_control_command_unknown_defaults_to_error() {
454        // Unknown command codes should default to ErrorResponse
455        assert_eq!(ControlCommand::from(0x00), ControlCommand::ErrorResponse);
456        assert_eq!(ControlCommand::from(0x99), ControlCommand::ErrorResponse);
457        assert_eq!(ControlCommand::from(0xFE), ControlCommand::ErrorResponse);
458    }
459
460    #[test]
461    fn test_control_command_roundtrip() {
462        // Verify command codes match enum values
463        assert_eq!(ControlCommand::Subscribe as u8, 0x20);
464        assert_eq!(ControlCommand::Unsubscribe as u8, 0x21);
465        assert_eq!(ControlCommand::CommitOffset as u8, 0x22);
466        assert_eq!(ControlCommand::SubscribeAck as u8, 0x23);
467        assert_eq!(ControlCommand::CommitAck as u8, 0x24);
468    }
469}