Skip to main content

lnc_network/
frame.rs

1use crate::LWP_HEADER_SIZE;
2use crate::protocol::{ControlCommand, IngestHeader, LwpFlags, LwpHeader};
3use bytes::Bytes;
4use lnc_core::{Result, crc32c};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum FrameType {
8    Ingest,
9    Ack,
10    Backpressure,
11    Keepalive,
12    Control(ControlCommand),
13    Unknown,
14}
15
16pub struct Frame {
17    pub header: LwpHeader,
18    pub payload: Option<Bytes>,
19    pub frame_type: FrameType,
20}
21
22impl Frame {
23    pub fn new_ingest(batch_id: u64, timestamp_ns: u64, record_count: u32, payload: Bytes) -> Self {
24        Self::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, 0)
25    }
26
27    pub fn new_ingest_with_topic(
28        batch_id: u64,
29        timestamp_ns: u64,
30        record_count: u32,
31        payload: Bytes,
32        topic_id: u32,
33    ) -> Self {
34        let payload_crc = crc32c(&payload);
35        let ingest = IngestHeader::with_topic(
36            batch_id,
37            timestamp_ns,
38            record_count,
39            payload.len() as u32,
40            payload_crc,
41            topic_id,
42        );
43        let header = LwpHeader::new(LwpFlags::BatchMode as u8, ingest);
44
45        Self {
46            header,
47            payload: Some(payload),
48            frame_type: FrameType::Ingest,
49        }
50    }
51
52    pub fn new_ack(batch_id: u64) -> Self {
53        let ingest = IngestHeader {
54            batch_id,
55            ..Default::default()
56        };
57        let header = LwpHeader::new(LwpFlags::Ack as u8, ingest);
58
59        Self {
60            header,
61            payload: None,
62            frame_type: FrameType::Ack,
63        }
64    }
65
66    pub fn new_keepalive() -> Self {
67        let header = LwpHeader::new(LwpFlags::Keepalive as u8, IngestHeader::default());
68
69        Self {
70            header,
71            payload: None,
72            frame_type: FrameType::Keepalive,
73        }
74    }
75
76    pub fn new_backpressure() -> Self {
77        let header = LwpHeader::new(LwpFlags::Backpressure as u8, IngestHeader::default());
78
79        Self {
80            header,
81            payload: None,
82            frame_type: FrameType::Backpressure,
83        }
84    }
85
86    pub fn new_control(command: ControlCommand, payload: Option<Bytes>) -> Self {
87        let (payload_len, payload_crc) = match &payload {
88            Some(p) => (p.len() as u32, crc32c(p)),
89            None => (0, 0),
90        };
91
92        let ingest = IngestHeader {
93            payload_length: payload_len,
94            payload_crc,
95            batch_id: command as u64,
96            ..Default::default()
97        };
98
99        let header = LwpHeader::new(LwpFlags::Control as u8, ingest);
100
101        Self {
102            header,
103            payload,
104            frame_type: FrameType::Control(command),
105        }
106    }
107
108    pub fn new_create_topic(topic_name: &str) -> Self {
109        Self::new_control(
110            ControlCommand::CreateTopic,
111            Some(Bytes::copy_from_slice(topic_name.as_bytes())),
112        )
113    }
114
115    pub fn new_list_topics() -> Self {
116        Self::new_control(ControlCommand::ListTopics, None)
117    }
118
119    pub fn new_get_topic(topic_id: u32) -> Self {
120        Self::new_control(
121            ControlCommand::GetTopic,
122            Some(Bytes::copy_from_slice(&topic_id.to_le_bytes())),
123        )
124    }
125
126    pub fn new_delete_topic(topic_id: u32) -> Self {
127        Self::new_control(
128            ControlCommand::DeleteTopic,
129            Some(Bytes::copy_from_slice(&topic_id.to_le_bytes())),
130        )
131    }
132
133    /// Create a set retention policy request frame
134    /// Payload format: topic_id(4) + max_age_secs(8) + max_bytes(8)
135    pub fn new_set_retention(topic_id: u32, max_age_secs: u64, max_bytes: u64) -> Self {
136        let mut payload = Vec::with_capacity(20);
137        payload.extend_from_slice(&topic_id.to_le_bytes());
138        payload.extend_from_slice(&max_age_secs.to_le_bytes());
139        payload.extend_from_slice(&max_bytes.to_le_bytes());
140        Self::new_control(ControlCommand::SetRetention, Some(Bytes::from(payload)))
141    }
142
143    /// Create a topic with retention configuration
144    /// Payload format: name_len(2) + name(var) + max_age_secs(8) + max_bytes(8)
145    pub fn new_create_topic_with_retention(
146        topic_name: &str,
147        max_age_secs: u64,
148        max_bytes: u64,
149    ) -> Self {
150        let name_bytes = topic_name.as_bytes();
151        let mut payload = Vec::with_capacity(2 + name_bytes.len() + 16);
152        payload.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
153        payload.extend_from_slice(name_bytes);
154        payload.extend_from_slice(&max_age_secs.to_le_bytes());
155        payload.extend_from_slice(&max_bytes.to_le_bytes());
156        Self::new_control(
157            ControlCommand::CreateTopicWithRetention,
158            Some(Bytes::from(payload)),
159        )
160    }
161
162    /// Create a fetch request frame
163    /// Payload format: topic_id(4) + start_offset(8) + max_bytes(4)
164    pub fn new_fetch(topic_id: u32, start_offset: u64, max_bytes: u32) -> Self {
165        let mut payload = Vec::with_capacity(16);
166        payload.extend_from_slice(&topic_id.to_le_bytes());
167        payload.extend_from_slice(&start_offset.to_le_bytes());
168        payload.extend_from_slice(&max_bytes.to_le_bytes());
169        Self::new_control(ControlCommand::Fetch, Some(Bytes::from(payload)))
170    }
171
172    pub fn new_fetch_response(payload: Bytes) -> Self {
173        Self::new_control(ControlCommand::FetchResponse, Some(payload))
174    }
175
176    /// Create a get cluster status request frame
177    pub fn new_get_cluster_status() -> Self {
178        Self::new_control(ControlCommand::GetClusterStatus, None)
179    }
180
181    /// Create a cluster status response frame
182    pub fn new_cluster_status_response(payload: Bytes) -> Self {
183        Self::new_control(ControlCommand::ClusterStatusResponse, Some(payload))
184    }
185
186    /// Create an authenticate request frame
187    /// Payload format: token (UTF-8 string bytes)
188    pub fn new_authenticate(token: &str) -> Self {
189        Self::new_control(
190            ControlCommand::Authenticate,
191            Some(Bytes::copy_from_slice(token.as_bytes())),
192        )
193    }
194
195    /// Create an authenticate response frame
196    /// Payload format: success(1) + optional message
197    pub fn new_authenticate_response(success: bool, message: Option<&str>) -> Self {
198        let mut payload = Vec::with_capacity(1 + message.map_or(0, |m| m.len()));
199        payload.push(if success { 1 } else { 0 });
200        if let Some(msg) = message {
201            payload.extend_from_slice(msg.as_bytes());
202        }
203        Self::new_control(
204            ControlCommand::AuthenticateResponse,
205            Some(Bytes::from(payload)),
206        )
207    }
208
209    /// Create a subscribe request frame
210    /// Payload format: topic_id(4) + start_offset(8) + max_batch_bytes(4) + consumer_id(8)
211    pub fn new_subscribe(
212        topic_id: u32,
213        start_offset: u64,
214        max_batch_bytes: u32,
215        consumer_id: u64,
216    ) -> Self {
217        let mut payload = Vec::with_capacity(24);
218        payload.extend_from_slice(&topic_id.to_le_bytes());
219        payload.extend_from_slice(&start_offset.to_le_bytes());
220        payload.extend_from_slice(&max_batch_bytes.to_le_bytes());
221        payload.extend_from_slice(&consumer_id.to_le_bytes());
222        Self::new_control(ControlCommand::Subscribe, Some(Bytes::from(payload)))
223    }
224
225    /// Create an unsubscribe request frame
226    /// Payload format: topic_id(4) + consumer_id(8)
227    pub fn new_unsubscribe(topic_id: u32, consumer_id: u64) -> Self {
228        let mut payload = Vec::with_capacity(12);
229        payload.extend_from_slice(&topic_id.to_le_bytes());
230        payload.extend_from_slice(&consumer_id.to_le_bytes());
231        Self::new_control(ControlCommand::Unsubscribe, Some(Bytes::from(payload)))
232    }
233
234    /// Create a commit offset request frame
235    /// Payload format: topic_id(4) + consumer_id(8) + offset(8)
236    pub fn new_commit_offset(topic_id: u32, consumer_id: u64, offset: u64) -> Self {
237        let mut payload = Vec::with_capacity(20);
238        payload.extend_from_slice(&topic_id.to_le_bytes());
239        payload.extend_from_slice(&consumer_id.to_le_bytes());
240        payload.extend_from_slice(&offset.to_le_bytes());
241        Self::new_control(ControlCommand::CommitOffset, Some(Bytes::from(payload)))
242    }
243
244    /// Create a subscribe acknowledgment frame
245    /// Payload format: consumer_id(8) + start_offset(8)
246    pub fn new_subscribe_ack(consumer_id: u64, start_offset: u64) -> Self {
247        let mut payload = Vec::with_capacity(16);
248        payload.extend_from_slice(&consumer_id.to_le_bytes());
249        payload.extend_from_slice(&start_offset.to_le_bytes());
250        Self::new_control(ControlCommand::SubscribeAck, Some(Bytes::from(payload)))
251    }
252
253    /// Create a commit offset acknowledgment frame
254    /// Payload format: consumer_id(8) + committed_offset(8)
255    pub fn new_commit_ack(consumer_id: u64, committed_offset: u64) -> Self {
256        let mut payload = Vec::with_capacity(16);
257        payload.extend_from_slice(&consumer_id.to_le_bytes());
258        payload.extend_from_slice(&committed_offset.to_le_bytes());
259        Self::new_control(ControlCommand::CommitAck, Some(Bytes::from(payload)))
260    }
261
262    pub fn new_topic_response(payload: Bytes) -> Self {
263        Self::new_control(ControlCommand::TopicResponse, Some(payload))
264    }
265
266    pub fn new_error_response(message: &str) -> Self {
267        Self::new_control(
268            ControlCommand::ErrorResponse,
269            Some(Bytes::copy_from_slice(message.as_bytes())),
270        )
271    }
272
273    #[inline]
274    #[must_use]
275    pub fn batch_id(&self) -> u64 {
276        self.header.ingest_header.batch_id
277    }
278
279    #[inline]
280    #[must_use]
281    pub fn payload_length(&self) -> u32 {
282        self.header.ingest_header.payload_length
283    }
284
285    #[inline]
286    #[must_use]
287    pub fn record_count(&self) -> u32 {
288        self.header.ingest_header.record_count
289    }
290
291    #[inline]
292    #[must_use]
293    pub fn topic_id(&self) -> u32 {
294        self.header.ingest_header.topic_id
295    }
296}
297
298pub fn parse_frame(buf: &[u8]) -> Result<Option<(Frame, usize)>> {
299    if buf.len() < LWP_HEADER_SIZE {
300        return Ok(None);
301    }
302
303    let header = LwpHeader::parse(buf)?;
304
305    let frame_type = determine_frame_type(&header);
306    let payload_len = header.ingest_header.payload_length as usize;
307    let total_len = LWP_HEADER_SIZE + payload_len;
308
309    if buf.len() < total_len {
310        return Ok(None);
311    }
312
313    let payload = if payload_len > 0 {
314        let payload_bytes = &buf[LWP_HEADER_SIZE..total_len];
315        header.ingest_header.validate_payload(payload_bytes)?;
316        Some(Bytes::copy_from_slice(payload_bytes))
317    } else {
318        None
319    };
320
321    let frame = Frame {
322        header,
323        payload,
324        frame_type,
325    };
326
327    Ok(Some((frame, total_len)))
328}
329
330pub fn encode_frame(frame: &Frame) -> Vec<u8> {
331    let header_bytes = frame.header.encode();
332    let payload_len = frame.payload.as_ref().map_or(0, |p| p.len());
333
334    let mut buf = Vec::with_capacity(LWP_HEADER_SIZE + payload_len);
335    buf.extend_from_slice(&header_bytes);
336
337    if let Some(ref payload) = frame.payload {
338        buf.extend_from_slice(payload);
339    }
340
341    buf
342}
343
344fn determine_frame_type(header: &LwpHeader) -> FrameType {
345    if header.has_flag(LwpFlags::Control) {
346        let command = ControlCommand::from(header.ingest_header.batch_id as u8);
347        FrameType::Control(command)
348    } else if header.has_flag(LwpFlags::Keepalive) {
349        FrameType::Keepalive
350    } else if header.has_flag(LwpFlags::Ack) {
351        FrameType::Ack
352    } else if header.has_flag(LwpFlags::Backpressure) {
353        FrameType::Backpressure
354    } else if header.has_flag(LwpFlags::BatchMode) || header.ingest_header.payload_length > 0 {
355        FrameType::Ingest
356    } else {
357        FrameType::Unknown
358    }
359}
360
361#[cfg(test)]
362#[allow(clippy::unwrap_used)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn test_ingest_frame_roundtrip() {
368        let payload = Bytes::from_static(b"test payload data");
369        let frame = Frame::new_ingest(12345, 1_000_000_000, 1, payload.clone());
370
371        let encoded = encode_frame(&frame);
372        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
373
374        assert_eq!(len, encoded.len());
375        assert_eq!(parsed.frame_type, FrameType::Ingest);
376        assert_eq!(parsed.batch_id(), 12345);
377        assert_eq!(parsed.payload.unwrap(), payload);
378    }
379
380    #[test]
381    fn test_ack_frame_roundtrip() {
382        let frame = Frame::new_ack(54321);
383
384        let encoded = encode_frame(&frame);
385        let (parsed, _) = parse_frame(&encoded).unwrap().unwrap();
386
387        assert_eq!(parsed.frame_type, FrameType::Ack);
388        assert_eq!(parsed.batch_id(), 54321);
389        assert!(parsed.payload.is_none());
390    }
391
392    #[test]
393    fn test_keepalive_frame() {
394        let frame = Frame::new_keepalive();
395
396        let encoded = encode_frame(&frame);
397        let (parsed, _) = parse_frame(&encoded).unwrap().unwrap();
398
399        assert_eq!(parsed.frame_type, FrameType::Keepalive);
400    }
401
402    #[test]
403    fn test_partial_frame() {
404        let payload = Bytes::from_static(b"test");
405        let frame = Frame::new_ingest(1, 0, 1, payload);
406        let encoded = encode_frame(&frame);
407
408        let result = parse_frame(&encoded[..LWP_HEADER_SIZE - 1]).unwrap();
409        assert!(result.is_none());
410
411        let result = parse_frame(&encoded[..LWP_HEADER_SIZE + 2]).unwrap();
412        assert!(result.is_none());
413    }
414
415    // =========================================================================
416    // Streaming Control Frame Tests
417    // =========================================================================
418
419    #[test]
420    fn test_subscribe_frame_roundtrip() {
421        let topic_id = 42u32;
422        let start_offset = 1000u64;
423        let max_batch_bytes = 65536u32;
424        let consumer_id = 0xDEADBEEF12345678u64;
425
426        let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
427
428        let encoded = encode_frame(&frame);
429        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
430
431        assert_eq!(len, encoded.len());
432        assert_eq!(
433            parsed.frame_type,
434            FrameType::Control(ControlCommand::Subscribe)
435        );
436
437        // Verify payload contents
438        let payload = parsed.payload.unwrap();
439        assert_eq!(payload.len(), 24);
440
441        let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
442        let parsed_offset = u64::from_le_bytes([
443            payload[4],
444            payload[5],
445            payload[6],
446            payload[7],
447            payload[8],
448            payload[9],
449            payload[10],
450            payload[11],
451        ]);
452        let parsed_max_bytes =
453            u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
454        let parsed_consumer_id = u64::from_le_bytes([
455            payload[16],
456            payload[17],
457            payload[18],
458            payload[19],
459            payload[20],
460            payload[21],
461            payload[22],
462            payload[23],
463        ]);
464
465        assert_eq!(parsed_topic_id, topic_id);
466        assert_eq!(parsed_offset, start_offset);
467        assert_eq!(parsed_max_bytes, max_batch_bytes);
468        assert_eq!(parsed_consumer_id, consumer_id);
469    }
470
471    #[test]
472    fn test_unsubscribe_frame_roundtrip() {
473        let topic_id = 7u32;
474        let consumer_id = 0xCAFEBABE00000001u64;
475
476        let frame = Frame::new_unsubscribe(topic_id, consumer_id);
477
478        let encoded = encode_frame(&frame);
479        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
480
481        assert_eq!(len, encoded.len());
482        assert_eq!(
483            parsed.frame_type,
484            FrameType::Control(ControlCommand::Unsubscribe)
485        );
486
487        // Verify payload contents
488        let payload = parsed.payload.unwrap();
489        assert_eq!(payload.len(), 12);
490
491        let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
492        let parsed_consumer_id = u64::from_le_bytes([
493            payload[4],
494            payload[5],
495            payload[6],
496            payload[7],
497            payload[8],
498            payload[9],
499            payload[10],
500            payload[11],
501        ]);
502
503        assert_eq!(parsed_topic_id, topic_id);
504        assert_eq!(parsed_consumer_id, consumer_id);
505    }
506
507    #[test]
508    fn test_commit_offset_frame_roundtrip() {
509        let topic_id = 99u32;
510        let consumer_id = 0x1234567890ABCDEFu64;
511        let offset = 50000u64;
512
513        let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
514
515        let encoded = encode_frame(&frame);
516        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
517
518        assert_eq!(len, encoded.len());
519        assert_eq!(
520            parsed.frame_type,
521            FrameType::Control(ControlCommand::CommitOffset)
522        );
523
524        // Verify payload contents
525        let payload = parsed.payload.unwrap();
526        assert_eq!(payload.len(), 20);
527
528        let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
529        let parsed_consumer_id = u64::from_le_bytes([
530            payload[4],
531            payload[5],
532            payload[6],
533            payload[7],
534            payload[8],
535            payload[9],
536            payload[10],
537            payload[11],
538        ]);
539        let parsed_offset = u64::from_le_bytes([
540            payload[12],
541            payload[13],
542            payload[14],
543            payload[15],
544            payload[16],
545            payload[17],
546            payload[18],
547            payload[19],
548        ]);
549
550        assert_eq!(parsed_topic_id, topic_id);
551        assert_eq!(parsed_consumer_id, consumer_id);
552        assert_eq!(parsed_offset, offset);
553    }
554
555    #[test]
556    fn test_subscribe_ack_frame_roundtrip() {
557        let consumer_id = 0xABCDEF0123456789u64;
558        let start_offset = 12345u64;
559
560        let frame = Frame::new_subscribe_ack(consumer_id, start_offset);
561
562        let encoded = encode_frame(&frame);
563        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
564
565        assert_eq!(len, encoded.len());
566        assert_eq!(
567            parsed.frame_type,
568            FrameType::Control(ControlCommand::SubscribeAck)
569        );
570
571        // Verify payload contents
572        let payload = parsed.payload.unwrap();
573        assert_eq!(payload.len(), 16);
574
575        let parsed_consumer_id = u64::from_le_bytes([
576            payload[0], payload[1], payload[2], payload[3], payload[4], payload[5], payload[6],
577            payload[7],
578        ]);
579        let parsed_offset = u64::from_le_bytes([
580            payload[8],
581            payload[9],
582            payload[10],
583            payload[11],
584            payload[12],
585            payload[13],
586            payload[14],
587            payload[15],
588        ]);
589
590        assert_eq!(parsed_consumer_id, consumer_id);
591        assert_eq!(parsed_offset, start_offset);
592    }
593
594    #[test]
595    fn test_commit_ack_frame_roundtrip() {
596        let consumer_id = 0xFEDCBA9876543210u64;
597        let committed_offset = 99999u64;
598
599        let frame = Frame::new_commit_ack(consumer_id, committed_offset);
600
601        let encoded = encode_frame(&frame);
602        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
603
604        assert_eq!(len, encoded.len());
605        assert_eq!(
606            parsed.frame_type,
607            FrameType::Control(ControlCommand::CommitAck)
608        );
609
610        // Verify payload contents
611        let payload = parsed.payload.unwrap();
612        assert_eq!(payload.len(), 16);
613
614        let parsed_consumer_id = u64::from_le_bytes([
615            payload[0], payload[1], payload[2], payload[3], payload[4], payload[5], payload[6],
616            payload[7],
617        ]);
618        let parsed_offset = u64::from_le_bytes([
619            payload[8],
620            payload[9],
621            payload[10],
622            payload[11],
623            payload[12],
624            payload[13],
625            payload[14],
626            payload[15],
627        ]);
628
629        assert_eq!(parsed_consumer_id, consumer_id);
630        assert_eq!(parsed_offset, committed_offset);
631    }
632
633    #[test]
634    fn test_fetch_frame_roundtrip() {
635        let topic_id = 5u32;
636        let start_offset = 1024u64;
637        let max_bytes = 32768u32;
638
639        let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
640
641        let encoded = encode_frame(&frame);
642        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
643
644        assert_eq!(len, encoded.len());
645        assert_eq!(parsed.frame_type, FrameType::Control(ControlCommand::Fetch));
646
647        // Verify payload contents
648        let payload = parsed.payload.unwrap();
649        assert_eq!(payload.len(), 16);
650
651        let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
652        let parsed_offset = u64::from_le_bytes([
653            payload[4],
654            payload[5],
655            payload[6],
656            payload[7],
657            payload[8],
658            payload[9],
659            payload[10],
660            payload[11],
661        ]);
662        let parsed_max_bytes =
663            u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
664
665        assert_eq!(parsed_topic_id, topic_id);
666        assert_eq!(parsed_offset, start_offset);
667        assert_eq!(parsed_max_bytes, max_bytes);
668    }
669}