Skip to main content

lnc_network/
protocol.rs

1use lnc_core::{LANCE_MAGIC, LanceError, Result, crc32c};
2
3pub const PROTOCOL_VERSION: u8 = 1;
4
5#[repr(u8)]
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum LwpFlags {
8    None = 0x00,
9    Compressed = 0x01,
10    Encrypted = 0x02,
11    BatchMode = 0x04,
12    Ack = 0x08,
13    Backpressure = 0x10,
14    Keepalive = 0x20,
15    Control = 0x40,
16}
17
18impl From<u8> for LwpFlags {
19    fn from(value: u8) -> Self {
20        match value {
21            0x01 => Self::Compressed,
22            0x02 => Self::Encrypted,
23            0x04 => Self::BatchMode,
24            0x08 => Self::Ack,
25            0x10 => Self::Backpressure,
26            0x20 => Self::Keepalive,
27            0x40 => Self::Control,
28            _ => Self::None,
29        }
30    }
31}
32
33#[repr(u8)]
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum ControlCommand {
36    // Topic management (0x01-0x0F)
37    CreateTopic = 0x01,
38    DeleteTopic = 0x02,
39    ListTopics = 0x03,
40    GetTopic = 0x04,
41    /// Set retention policy for a topic
42    SetRetention = 0x05,
43    /// Create topic with retention configuration
44    CreateTopicWithRetention = 0x06,
45
46    // Cluster management (0x07-0x0F)
47    /// Get cluster status and health information
48    GetClusterStatus = 0x07,
49    /// Authenticate client with token
50    Authenticate = 0x08,
51    /// Authentication response (success/failure)
52    AuthenticateResponse = 0x09,
53
54    // Data fetch - request/response (0x10-0x1F)
55    Fetch = 0x10,
56    FetchResponse = 0x11,
57
58    // Streaming control (0x20-0x2F)
59    /// Subscribe to a topic for streaming - server will push data
60    Subscribe = 0x20,
61    /// Unsubscribe from a topic - stop streaming
62    Unsubscribe = 0x21,
63    /// Commit consumer offset for checkpointing
64    CommitOffset = 0x22,
65    /// Server acknowledgment of subscription
66    SubscribeAck = 0x23,
67    /// Server acknowledgment of offset commit
68    CommitAck = 0x24,
69
70    // Responses (0x80+)
71    TopicResponse = 0x80,
72    /// Cluster status response
73    ClusterStatusResponse = 0x81,
74    ErrorResponse = 0xFF,
75}
76
77impl From<u8> for ControlCommand {
78    fn from(value: u8) -> Self {
79        match value {
80            0x01 => Self::CreateTopic,
81            0x02 => Self::DeleteTopic,
82            0x03 => Self::ListTopics,
83            0x04 => Self::GetTopic,
84            0x05 => Self::SetRetention,
85            0x06 => Self::CreateTopicWithRetention,
86            0x07 => Self::GetClusterStatus,
87            0x08 => Self::Authenticate,
88            0x09 => Self::AuthenticateResponse,
89            0x10 => Self::Fetch,
90            0x11 => Self::FetchResponse,
91            0x20 => Self::Subscribe,
92            0x21 => Self::Unsubscribe,
93            0x22 => Self::CommitOffset,
94            0x23 => Self::SubscribeAck,
95            0x24 => Self::CommitAck,
96            0x80 => Self::TopicResponse,
97            0x81 => Self::ClusterStatusResponse,
98            0xFF => Self::ErrorResponse,
99            _ => Self::ErrorResponse,
100        }
101    }
102}
103
104#[derive(Debug, Clone, Copy)]
105pub struct LwpHeader {
106    pub magic: [u8; 4],
107    pub version: u8,
108    pub flags: u8,
109    pub reserved: [u8; 2],
110    pub header_crc: u32,
111    pub ingest_header: IngestHeader,
112}
113
114impl LwpHeader {
115    pub const SIZE: usize = 44;
116
117    pub fn new(flags: u8, ingest_header: IngestHeader) -> Self {
118        let mut header = Self {
119            magic: LANCE_MAGIC,
120            version: PROTOCOL_VERSION,
121            flags,
122            reserved: [0; 2],
123            header_crc: 0,
124            ingest_header,
125        };
126
127        let mut crc_buf = [0u8; 8];
128        crc_buf[0..4].copy_from_slice(&header.magic);
129        crc_buf[4] = header.version;
130        crc_buf[5] = header.flags;
131        crc_buf[6..8].copy_from_slice(&header.reserved);
132        header.header_crc = crc32c(&crc_buf);
133
134        header
135    }
136
137    pub fn parse(buf: &[u8]) -> Result<Self> {
138        if buf.len() < Self::SIZE {
139            return Err(LanceError::Protocol(
140                "Buffer too small for LWP header".into(),
141            ));
142        }
143
144        if buf[0..4] != LANCE_MAGIC {
145            return Err(LanceError::InvalidMagic);
146        }
147
148        let version = buf[4];
149        if version != PROTOCOL_VERSION {
150            return Err(LanceError::Protocol(format!(
151                "Unsupported protocol version: {}",
152                version
153            )));
154        }
155
156        let stored_crc = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
157        let computed_crc = crc32c(&buf[0..8]);
158
159        if stored_crc != computed_crc {
160            return Err(LanceError::CrcMismatch {
161                expected: stored_crc,
162                actual: computed_crc,
163            });
164        }
165
166        let mut magic = [0u8; 4];
167        magic.copy_from_slice(&buf[0..4]);
168
169        let ingest_header = IngestHeader::parse(&buf[12..Self::SIZE])?;
170
171        Ok(Self {
172            magic,
173            version: buf[4],
174            flags: buf[5],
175            reserved: [buf[6], buf[7]],
176            header_crc: stored_crc,
177            ingest_header,
178        })
179    }
180
181    pub fn encode(&self) -> [u8; Self::SIZE] {
182        let mut buf = [0u8; Self::SIZE];
183        buf[0..4].copy_from_slice(&self.magic);
184        buf[4] = self.version;
185        buf[5] = self.flags;
186        buf[6..8].copy_from_slice(&self.reserved);
187        buf[8..12].copy_from_slice(&self.header_crc.to_le_bytes());
188        self.ingest_header.encode_into(&mut buf[12..Self::SIZE]);
189        buf
190    }
191
192    #[inline]
193    pub fn has_flag(&self, flag: LwpFlags) -> bool {
194        self.flags & (flag as u8) != 0
195    }
196
197    #[inline]
198    pub fn is_keepalive(&self) -> bool {
199        self.has_flag(LwpFlags::Keepalive)
200    }
201
202    #[inline]
203    pub fn is_batch_mode(&self) -> bool {
204        self.has_flag(LwpFlags::BatchMode)
205    }
206
207    #[inline]
208    pub fn is_compressed(&self) -> bool {
209        self.has_flag(LwpFlags::Compressed)
210    }
211
212    #[inline]
213    pub fn is_control(&self) -> bool {
214        self.has_flag(LwpFlags::Control)
215    }
216
217    /// Create a keepalive header
218    pub fn keepalive() -> Self {
219        Self::new(LwpFlags::Keepalive as u8, IngestHeader::default())
220    }
221}
222
223#[derive(Debug, Clone, Copy, Default)]
224pub struct IngestHeader {
225    pub batch_id: u64,
226    pub timestamp_ns: u64,
227    pub record_count: u32,
228    pub payload_length: u32,
229    pub payload_crc: u32,
230    pub topic_id: u32,
231}
232
233impl IngestHeader {
234    pub const SIZE: usize = 32;
235
236    pub fn new(
237        batch_id: u64,
238        timestamp_ns: u64,
239        record_count: u32,
240        payload_length: u32,
241        payload_crc: u32,
242    ) -> Self {
243        Self {
244            batch_id,
245            timestamp_ns,
246            record_count,
247            payload_length,
248            payload_crc,
249            topic_id: 0,
250        }
251    }
252
253    pub fn with_topic(
254        batch_id: u64,
255        timestamp_ns: u64,
256        record_count: u32,
257        payload_length: u32,
258        payload_crc: u32,
259        topic_id: u32,
260    ) -> Self {
261        Self {
262            batch_id,
263            timestamp_ns,
264            record_count,
265            payload_length,
266            payload_crc,
267            topic_id,
268        }
269    }
270
271    pub fn parse(buf: &[u8]) -> Result<Self> {
272        if buf.len() < Self::SIZE {
273            return Err(LanceError::Protocol(
274                "Buffer too small for IngestHeader".into(),
275            ));
276        }
277
278        Ok(Self {
279            batch_id: u64::from_le_bytes([
280                buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
281            ]),
282            timestamp_ns: u64::from_le_bytes([
283                buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
284            ]),
285            record_count: u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]),
286            payload_length: u32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]),
287            payload_crc: u32::from_le_bytes([buf[24], buf[25], buf[26], buf[27]]),
288            topic_id: u32::from_le_bytes([buf[28], buf[29], buf[30], buf[31]]),
289        })
290    }
291
292    pub fn encode_into(&self, buf: &mut [u8]) {
293        buf[0..8].copy_from_slice(&self.batch_id.to_le_bytes());
294        buf[8..16].copy_from_slice(&self.timestamp_ns.to_le_bytes());
295        buf[16..20].copy_from_slice(&self.record_count.to_le_bytes());
296        buf[20..24].copy_from_slice(&self.payload_length.to_le_bytes());
297        buf[24..28].copy_from_slice(&self.payload_crc.to_le_bytes());
298        buf[28..32].copy_from_slice(&self.topic_id.to_le_bytes());
299    }
300
301    pub fn validate_payload(&self, payload: &[u8]) -> Result<()> {
302        if payload.len() != self.payload_length as usize {
303            return Err(LanceError::Protocol(format!(
304                "Payload length mismatch: expected {}, got {}",
305                self.payload_length,
306                payload.len()
307            )));
308        }
309
310        let actual_crc = crc32c(payload);
311        if actual_crc != self.payload_crc {
312            lnc_metrics::increment_crc_failures();
313            return Err(LanceError::CrcMismatch {
314                expected: self.payload_crc,
315                actual: actual_crc,
316            });
317        }
318
319        Ok(())
320    }
321}
322
323#[cfg(test)]
324#[allow(clippy::unwrap_used)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_lwp_header_roundtrip() {
330        let ingest = IngestHeader::new(12345, 1_000_000_000, 100, 4096, 0xDEADBEEF);
331        let header = LwpHeader::new(LwpFlags::BatchMode as u8, ingest);
332
333        let encoded = header.encode();
334        let parsed = LwpHeader::parse(&encoded).unwrap();
335
336        assert_eq!(parsed.magic, LANCE_MAGIC);
337        assert_eq!(parsed.version, PROTOCOL_VERSION);
338        assert_eq!(parsed.ingest_header.batch_id, 12345);
339        assert_eq!(parsed.ingest_header.record_count, 100);
340    }
341
342    #[test]
343    fn test_invalid_magic() {
344        let mut buf = [0u8; LwpHeader::SIZE];
345        buf[0..4].copy_from_slice(b"JUNK");
346
347        let result = LwpHeader::parse(&buf);
348        assert!(matches!(result, Err(LanceError::InvalidMagic)));
349    }
350
351    #[test]
352    fn test_keepalive() {
353        let header = LwpHeader::keepalive();
354        assert!(header.is_keepalive());
355        assert!(!header.is_batch_mode());
356    }
357
358    #[test]
359    fn test_payload_validation() {
360        let payload = b"test payload data";
361        let crc = crc32c(payload);
362
363        let ingest = IngestHeader::new(1, 0, 1, payload.len() as u32, crc);
364        assert!(ingest.validate_payload(payload).is_ok());
365
366        let wrong_crc_ingest = IngestHeader::new(1, 0, 1, payload.len() as u32, 0xBADBAD);
367        assert!(wrong_crc_ingest.validate_payload(payload).is_err());
368    }
369
370    // =========================================================================
371    // ControlCommand Parsing Tests
372    // =========================================================================
373
374    #[test]
375    fn test_control_command_topic_management() {
376        assert_eq!(ControlCommand::from(0x01), ControlCommand::CreateTopic);
377        assert_eq!(ControlCommand::from(0x02), ControlCommand::DeleteTopic);
378        assert_eq!(ControlCommand::from(0x03), ControlCommand::ListTopics);
379        assert_eq!(ControlCommand::from(0x04), ControlCommand::GetTopic);
380    }
381
382    #[test]
383    fn test_control_command_fetch() {
384        assert_eq!(ControlCommand::from(0x10), ControlCommand::Fetch);
385        assert_eq!(ControlCommand::from(0x11), ControlCommand::FetchResponse);
386    }
387
388    #[test]
389    fn test_control_command_streaming() {
390        assert_eq!(ControlCommand::from(0x20), ControlCommand::Subscribe);
391        assert_eq!(ControlCommand::from(0x21), ControlCommand::Unsubscribe);
392        assert_eq!(ControlCommand::from(0x22), ControlCommand::CommitOffset);
393        assert_eq!(ControlCommand::from(0x23), ControlCommand::SubscribeAck);
394        assert_eq!(ControlCommand::from(0x24), ControlCommand::CommitAck);
395    }
396
397    #[test]
398    fn test_control_command_responses() {
399        assert_eq!(ControlCommand::from(0x80), ControlCommand::TopicResponse);
400        assert_eq!(ControlCommand::from(0xFF), ControlCommand::ErrorResponse);
401    }
402
403    #[test]
404    fn test_control_command_authentication() {
405        assert_eq!(ControlCommand::from(0x08), ControlCommand::Authenticate);
406        assert_eq!(
407            ControlCommand::from(0x09),
408            ControlCommand::AuthenticateResponse
409        );
410        assert_eq!(ControlCommand::Authenticate as u8, 0x08);
411        assert_eq!(ControlCommand::AuthenticateResponse as u8, 0x09);
412    }
413
414    #[test]
415    fn test_control_command_unknown_defaults_to_error() {
416        // Unknown command codes should default to ErrorResponse
417        assert_eq!(ControlCommand::from(0x00), ControlCommand::ErrorResponse);
418        assert_eq!(ControlCommand::from(0x99), ControlCommand::ErrorResponse);
419        assert_eq!(ControlCommand::from(0xFE), ControlCommand::ErrorResponse);
420    }
421
422    #[test]
423    fn test_control_command_roundtrip() {
424        // Verify command codes match enum values
425        assert_eq!(ControlCommand::Subscribe as u8, 0x20);
426        assert_eq!(ControlCommand::Unsubscribe as u8, 0x21);
427        assert_eq!(ControlCommand::CommitOffset as u8, 0x22);
428        assert_eq!(ControlCommand::SubscribeAck as u8, 0x23);
429        assert_eq!(ControlCommand::CommitAck as u8, 0x24);
430    }
431}