Skip to main content

lnc_network/
frame.rs

1use crate::LWP_HEADER_SIZE;
2use crate::protocol::{ControlCommand, IngestHeader, LwpFlags, LwpHeader};
3use bytes::Bytes;
4use lnc_core::{Result, crc32c};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum FrameType {
8    Ingest,
9    Ack,
10    Backpressure,
11    Keepalive,
12    Control(ControlCommand),
13    Unknown,
14}
15
16pub struct Frame {
17    pub header: LwpHeader,
18    pub payload: Option<Bytes>,
19    pub frame_type: FrameType,
20}
21
22impl Frame {
23    pub fn new_ingest(batch_id: u64, timestamp_ns: u64, record_count: u32, payload: Bytes) -> Self {
24        Self::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, 0)
25    }
26
27    pub fn new_ingest_with_topic(
28        batch_id: u64,
29        timestamp_ns: u64,
30        record_count: u32,
31        payload: Bytes,
32        topic_id: u32,
33    ) -> Self {
34        let payload_crc = crc32c(&payload);
35        let ingest = IngestHeader::with_topic(
36            batch_id,
37            timestamp_ns,
38            record_count,
39            payload.len() as u32,
40            payload_crc,
41            topic_id,
42        );
43        let header = LwpHeader::new(LwpFlags::BatchMode as u8, ingest);
44
45        Self {
46            header,
47            payload: Some(payload),
48            frame_type: FrameType::Ingest,
49        }
50    }
51
52    pub fn new_ack(batch_id: u64) -> Self {
53        let ingest = IngestHeader {
54            batch_id,
55            ..Default::default()
56        };
57        let header = LwpHeader::new(LwpFlags::Ack as u8, ingest);
58
59        Self {
60            header,
61            payload: None,
62            frame_type: FrameType::Ack,
63        }
64    }
65
66    pub fn new_keepalive() -> Self {
67        let header = LwpHeader::new(LwpFlags::Keepalive as u8, IngestHeader::default());
68
69        Self {
70            header,
71            payload: None,
72            frame_type: FrameType::Keepalive,
73        }
74    }
75
76    pub fn new_backpressure() -> Self {
77        let header = LwpHeader::new(LwpFlags::Backpressure as u8, IngestHeader::default());
78
79        Self {
80            header,
81            payload: None,
82            frame_type: FrameType::Backpressure,
83        }
84    }
85
86    pub fn new_control(command: ControlCommand, payload: Option<Bytes>) -> Self {
87        let (payload_len, payload_crc) = match &payload {
88            Some(p) => (p.len() as u32, crc32c(p)),
89            None => (0, 0),
90        };
91
92        let ingest = IngestHeader {
93            payload_length: payload_len,
94            payload_crc,
95            batch_id: command as u64,
96            ..Default::default()
97        };
98
99        let header = LwpHeader::new(LwpFlags::Control as u8, ingest);
100
101        Self {
102            header,
103            payload,
104            frame_type: FrameType::Control(command),
105        }
106    }
107
108    pub fn new_create_topic(topic_name: &str) -> Self {
109        Self::new_control(
110            ControlCommand::CreateTopic,
111            Some(Bytes::copy_from_slice(topic_name.as_bytes())),
112        )
113    }
114
115    pub fn new_list_topics() -> Self {
116        Self::new_control(ControlCommand::ListTopics, None)
117    }
118
119    pub fn new_get_topic(topic_id: u32) -> Self {
120        Self::new_control(
121            ControlCommand::GetTopic,
122            Some(Bytes::copy_from_slice(&topic_id.to_le_bytes())),
123        )
124    }
125
126    pub fn new_delete_topic(topic_id: u32) -> Self {
127        Self::new_control(
128            ControlCommand::DeleteTopic,
129            Some(Bytes::copy_from_slice(&topic_id.to_le_bytes())),
130        )
131    }
132
133    /// Create a set retention policy request frame
134    /// Payload format: topic_id(4) + max_age_secs(8) + max_bytes(8)
135    pub fn new_set_retention(topic_id: u32, max_age_secs: u64, max_bytes: u64) -> Self {
136        let mut payload = Vec::with_capacity(20);
137        payload.extend_from_slice(&topic_id.to_le_bytes());
138        payload.extend_from_slice(&max_age_secs.to_le_bytes());
139        payload.extend_from_slice(&max_bytes.to_le_bytes());
140        Self::new_control(ControlCommand::SetRetention, Some(Bytes::from(payload)))
141    }
142
143    /// Create a topic with retention configuration
144    /// Payload format: name_len(2) + name(var) + max_age_secs(8) + max_bytes(8)
145    pub fn new_create_topic_with_retention(
146        topic_name: &str,
147        max_age_secs: u64,
148        max_bytes: u64,
149    ) -> Self {
150        let name_bytes = topic_name.as_bytes();
151        let mut payload = Vec::with_capacity(2 + name_bytes.len() + 16);
152        payload.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
153        payload.extend_from_slice(name_bytes);
154        payload.extend_from_slice(&max_age_secs.to_le_bytes());
155        payload.extend_from_slice(&max_bytes.to_le_bytes());
156        Self::new_control(
157            ControlCommand::CreateTopicWithRetention,
158            Some(Bytes::from(payload)),
159        )
160    }
161
162    /// Create a fetch request frame
163    /// Payload format: topic_id(4) + start_offset(8) + max_bytes(4)
164    pub fn new_fetch(topic_id: u32, start_offset: u64, max_bytes: u32) -> Self {
165        let mut payload = Vec::with_capacity(16);
166        payload.extend_from_slice(&topic_id.to_le_bytes());
167        payload.extend_from_slice(&start_offset.to_le_bytes());
168        payload.extend_from_slice(&max_bytes.to_le_bytes());
169        Self::new_control(ControlCommand::Fetch, Some(Bytes::from(payload)))
170    }
171
172    pub fn new_fetch_response(payload: Bytes) -> Self {
173        Self::new_control(ControlCommand::FetchResponse, Some(payload))
174    }
175
176    /// Create a catching-up response frame indicating the server has not yet
177    /// replicated to the requested offset. Payload contains the current max
178    /// offset (8 bytes LE u64) so the client knows the server's progress.
179    pub fn new_catching_up(current_max_offset: u64) -> Self {
180        Self::new_control(
181            ControlCommand::CatchingUp,
182            Some(Bytes::copy_from_slice(&current_max_offset.to_le_bytes())),
183        )
184    }
185
186    /// Create a get cluster status request frame
187    pub fn new_get_cluster_status() -> Self {
188        Self::new_control(ControlCommand::GetClusterStatus, None)
189    }
190
191    /// Create a cluster status response frame
192    pub fn new_cluster_status_response(payload: Bytes) -> Self {
193        Self::new_control(ControlCommand::ClusterStatusResponse, Some(payload))
194    }
195
196    /// Create an authenticate request frame
197    /// Payload format: token (UTF-8 string bytes)
198    pub fn new_authenticate(token: &str) -> Self {
199        Self::new_control(
200            ControlCommand::Authenticate,
201            Some(Bytes::copy_from_slice(token.as_bytes())),
202        )
203    }
204
205    /// Create an authenticate response frame
206    /// Payload format: success(1) + optional message
207    pub fn new_authenticate_response(success: bool, message: Option<&str>) -> Self {
208        let mut payload = Vec::with_capacity(1 + message.map_or(0, |m| m.len()));
209        payload.push(if success { 1 } else { 0 });
210        if let Some(msg) = message {
211            payload.extend_from_slice(msg.as_bytes());
212        }
213        Self::new_control(
214            ControlCommand::AuthenticateResponse,
215            Some(Bytes::from(payload)),
216        )
217    }
218
219    /// Create a subscribe request frame
220    /// Payload format: topic_id(4) + start_offset(8) + max_batch_bytes(4) + consumer_id(8)
221    pub fn new_subscribe(
222        topic_id: u32,
223        start_offset: u64,
224        max_batch_bytes: u32,
225        consumer_id: u64,
226    ) -> Self {
227        let mut payload = Vec::with_capacity(24);
228        payload.extend_from_slice(&topic_id.to_le_bytes());
229        payload.extend_from_slice(&start_offset.to_le_bytes());
230        payload.extend_from_slice(&max_batch_bytes.to_le_bytes());
231        payload.extend_from_slice(&consumer_id.to_le_bytes());
232        Self::new_control(ControlCommand::Subscribe, Some(Bytes::from(payload)))
233    }
234
235    /// Create an unsubscribe request frame
236    /// Payload format: topic_id(4) + consumer_id(8)
237    pub fn new_unsubscribe(topic_id: u32, consumer_id: u64) -> Self {
238        let mut payload = Vec::with_capacity(12);
239        payload.extend_from_slice(&topic_id.to_le_bytes());
240        payload.extend_from_slice(&consumer_id.to_le_bytes());
241        Self::new_control(ControlCommand::Unsubscribe, Some(Bytes::from(payload)))
242    }
243
244    /// Create a commit offset request frame
245    /// Payload format: topic_id(4) + consumer_id(8) + offset(8)
246    pub fn new_commit_offset(topic_id: u32, consumer_id: u64, offset: u64) -> Self {
247        let mut payload = Vec::with_capacity(20);
248        payload.extend_from_slice(&topic_id.to_le_bytes());
249        payload.extend_from_slice(&consumer_id.to_le_bytes());
250        payload.extend_from_slice(&offset.to_le_bytes());
251        Self::new_control(ControlCommand::CommitOffset, Some(Bytes::from(payload)))
252    }
253
254    /// Create a subscribe acknowledgment frame
255    /// Payload format: consumer_id(8) + start_offset(8)
256    pub fn new_subscribe_ack(consumer_id: u64, start_offset: u64) -> Self {
257        let mut payload = Vec::with_capacity(16);
258        payload.extend_from_slice(&consumer_id.to_le_bytes());
259        payload.extend_from_slice(&start_offset.to_le_bytes());
260        Self::new_control(ControlCommand::SubscribeAck, Some(Bytes::from(payload)))
261    }
262
263    /// Create a commit offset acknowledgment frame
264    /// Payload format: consumer_id(8) + committed_offset(8)
265    pub fn new_commit_ack(consumer_id: u64, committed_offset: u64) -> Self {
266        let mut payload = Vec::with_capacity(16);
267        payload.extend_from_slice(&consumer_id.to_le_bytes());
268        payload.extend_from_slice(&committed_offset.to_le_bytes());
269        Self::new_control(ControlCommand::CommitAck, Some(Bytes::from(payload)))
270    }
271
272    pub fn new_topic_response(payload: Bytes) -> Self {
273        Self::new_control(ControlCommand::TopicResponse, Some(payload))
274    }
275
276    pub fn new_error_response(message: &str) -> Self {
277        Self::new_control(
278            ControlCommand::ErrorResponse,
279            Some(Bytes::copy_from_slice(message.as_bytes())),
280        )
281    }
282
283    #[inline]
284    #[must_use]
285    pub fn batch_id(&self) -> u64 {
286        self.header.ingest_header.batch_id
287    }
288
289    #[inline]
290    #[must_use]
291    pub fn payload_length(&self) -> u32 {
292        self.header.ingest_header.payload_length
293    }
294
295    #[inline]
296    #[must_use]
297    pub fn record_count(&self) -> u32 {
298        self.header.ingest_header.record_count
299    }
300
301    #[inline]
302    #[must_use]
303    pub fn topic_id(&self) -> u32 {
304        self.header.ingest_header.topic_id
305    }
306}
307
308pub fn parse_frame(buf: &[u8]) -> Result<Option<(Frame, usize)>> {
309    if buf.len() < LWP_HEADER_SIZE {
310        return Ok(None);
311    }
312
313    let header = LwpHeader::parse(buf)?;
314
315    let frame_type = determine_frame_type(&header);
316    let payload_len = header.ingest_header.payload_length as usize;
317    let total_len = LWP_HEADER_SIZE + payload_len;
318
319    if buf.len() < total_len {
320        return Ok(None);
321    }
322
323    let payload = if payload_len > 0 {
324        let payload_bytes = &buf[LWP_HEADER_SIZE..total_len];
325        header.ingest_header.validate_payload(payload_bytes)?;
326        Some(Bytes::copy_from_slice(payload_bytes))
327    } else {
328        None
329    };
330
331    let frame = Frame {
332        header,
333        payload,
334        frame_type,
335    };
336
337    Ok(Some((frame, total_len)))
338}
339
340/// Encode an ACK frame directly into a stack-allocated buffer.
341/// Avoids the `Vec<u8>` allocation that `encode_frame` requires.
342#[inline]
343pub fn encode_ack_bytes(batch_id: u64) -> [u8; crate::LWP_HEADER_SIZE] {
344    let frame = Frame::new_ack(batch_id);
345    frame.header.encode()
346}
347
348/// Encode a Backpressure frame (0x10) directly into a stack-allocated buffer.
349///
350/// **Spec ยง4.4**: Followers return this when forward_write times out or the
351/// connection pool is exhausted, signaling the client to back off.
352#[inline]
353pub fn encode_backpressure_bytes() -> [u8; crate::LWP_HEADER_SIZE] {
354    let frame = Frame::new_backpressure();
355    frame.header.encode()
356}
357
358pub fn encode_frame(frame: &Frame) -> Vec<u8> {
359    let header_bytes = frame.header.encode();
360    let payload_len = frame.payload.as_ref().map_or(0, |p| p.len());
361
362    let mut buf = Vec::with_capacity(LWP_HEADER_SIZE + payload_len);
363    buf.extend_from_slice(&header_bytes);
364
365    if let Some(ref payload) = frame.payload {
366        buf.extend_from_slice(payload);
367    }
368
369    buf
370}
371
372fn determine_frame_type(header: &LwpHeader) -> FrameType {
373    if header.has_flag(LwpFlags::Control) {
374        let command = ControlCommand::from(header.ingest_header.batch_id as u8);
375        FrameType::Control(command)
376    } else if header.has_flag(LwpFlags::Keepalive) {
377        FrameType::Keepalive
378    } else if header.has_flag(LwpFlags::Ack) {
379        FrameType::Ack
380    } else if header.has_flag(LwpFlags::Backpressure) {
381        FrameType::Backpressure
382    } else if header.has_flag(LwpFlags::BatchMode) || header.ingest_header.payload_length > 0 {
383        FrameType::Ingest
384    } else {
385        FrameType::Unknown
386    }
387}
388
389#[cfg(test)]
390#[allow(clippy::unwrap_used)]
391mod tests {
392    use super::*;
393
394    #[test]
395    fn test_ingest_frame_roundtrip() {
396        let payload = Bytes::from_static(b"test payload data");
397        let frame = Frame::new_ingest(12345, 1_000_000_000, 1, payload.clone());
398
399        let encoded = encode_frame(&frame);
400        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
401
402        assert_eq!(len, encoded.len());
403        assert_eq!(parsed.frame_type, FrameType::Ingest);
404        assert_eq!(parsed.batch_id(), 12345);
405        assert_eq!(parsed.payload.unwrap(), payload);
406    }
407
408    #[test]
409    fn test_ack_frame_roundtrip() {
410        let frame = Frame::new_ack(54321);
411
412        let encoded = encode_frame(&frame);
413        let (parsed, _) = parse_frame(&encoded).unwrap().unwrap();
414
415        assert_eq!(parsed.frame_type, FrameType::Ack);
416        assert_eq!(parsed.batch_id(), 54321);
417        assert!(parsed.payload.is_none());
418    }
419
420    #[test]
421    fn test_keepalive_frame() {
422        let frame = Frame::new_keepalive();
423
424        let encoded = encode_frame(&frame);
425        let (parsed, _) = parse_frame(&encoded).unwrap().unwrap();
426
427        assert_eq!(parsed.frame_type, FrameType::Keepalive);
428    }
429
430    #[test]
431    fn test_partial_frame() {
432        let payload = Bytes::from_static(b"test");
433        let frame = Frame::new_ingest(1, 0, 1, payload);
434        let encoded = encode_frame(&frame);
435
436        let result = parse_frame(&encoded[..LWP_HEADER_SIZE - 1]).unwrap();
437        assert!(result.is_none());
438
439        let result = parse_frame(&encoded[..LWP_HEADER_SIZE + 2]).unwrap();
440        assert!(result.is_none());
441    }
442
443    // =========================================================================
444    // Streaming Control Frame Tests
445    // =========================================================================
446
447    #[test]
448    fn test_subscribe_frame_roundtrip() {
449        let topic_id = 42u32;
450        let start_offset = 1000u64;
451        let max_batch_bytes = 65536u32;
452        let consumer_id = 0xDEADBEEF12345678u64;
453
454        let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
455
456        let encoded = encode_frame(&frame);
457        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
458
459        assert_eq!(len, encoded.len());
460        assert_eq!(
461            parsed.frame_type,
462            FrameType::Control(ControlCommand::Subscribe)
463        );
464
465        // Verify payload contents
466        let payload = parsed.payload.unwrap();
467        assert_eq!(payload.len(), 24);
468
469        let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
470        let parsed_offset = u64::from_le_bytes([
471            payload[4],
472            payload[5],
473            payload[6],
474            payload[7],
475            payload[8],
476            payload[9],
477            payload[10],
478            payload[11],
479        ]);
480        let parsed_max_bytes =
481            u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
482        let parsed_consumer_id = u64::from_le_bytes([
483            payload[16],
484            payload[17],
485            payload[18],
486            payload[19],
487            payload[20],
488            payload[21],
489            payload[22],
490            payload[23],
491        ]);
492
493        assert_eq!(parsed_topic_id, topic_id);
494        assert_eq!(parsed_offset, start_offset);
495        assert_eq!(parsed_max_bytes, max_batch_bytes);
496        assert_eq!(parsed_consumer_id, consumer_id);
497    }
498
499    #[test]
500    fn test_unsubscribe_frame_roundtrip() {
501        let topic_id = 7u32;
502        let consumer_id = 0xCAFEBABE00000001u64;
503
504        let frame = Frame::new_unsubscribe(topic_id, consumer_id);
505
506        let encoded = encode_frame(&frame);
507        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
508
509        assert_eq!(len, encoded.len());
510        assert_eq!(
511            parsed.frame_type,
512            FrameType::Control(ControlCommand::Unsubscribe)
513        );
514
515        // Verify payload contents
516        let payload = parsed.payload.unwrap();
517        assert_eq!(payload.len(), 12);
518
519        let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
520        let parsed_consumer_id = u64::from_le_bytes([
521            payload[4],
522            payload[5],
523            payload[6],
524            payload[7],
525            payload[8],
526            payload[9],
527            payload[10],
528            payload[11],
529        ]);
530
531        assert_eq!(parsed_topic_id, topic_id);
532        assert_eq!(parsed_consumer_id, consumer_id);
533    }
534
535    #[test]
536    fn test_commit_offset_frame_roundtrip() {
537        let topic_id = 99u32;
538        let consumer_id = 0x1234567890ABCDEFu64;
539        let offset = 50000u64;
540
541        let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
542
543        let encoded = encode_frame(&frame);
544        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
545
546        assert_eq!(len, encoded.len());
547        assert_eq!(
548            parsed.frame_type,
549            FrameType::Control(ControlCommand::CommitOffset)
550        );
551
552        // Verify payload contents
553        let payload = parsed.payload.unwrap();
554        assert_eq!(payload.len(), 20);
555
556        let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
557        let parsed_consumer_id = u64::from_le_bytes([
558            payload[4],
559            payload[5],
560            payload[6],
561            payload[7],
562            payload[8],
563            payload[9],
564            payload[10],
565            payload[11],
566        ]);
567        let parsed_offset = u64::from_le_bytes([
568            payload[12],
569            payload[13],
570            payload[14],
571            payload[15],
572            payload[16],
573            payload[17],
574            payload[18],
575            payload[19],
576        ]);
577
578        assert_eq!(parsed_topic_id, topic_id);
579        assert_eq!(parsed_consumer_id, consumer_id);
580        assert_eq!(parsed_offset, offset);
581    }
582
583    #[test]
584    fn test_subscribe_ack_frame_roundtrip() {
585        let consumer_id = 0xABCDEF0123456789u64;
586        let start_offset = 12345u64;
587
588        let frame = Frame::new_subscribe_ack(consumer_id, start_offset);
589
590        let encoded = encode_frame(&frame);
591        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
592
593        assert_eq!(len, encoded.len());
594        assert_eq!(
595            parsed.frame_type,
596            FrameType::Control(ControlCommand::SubscribeAck)
597        );
598
599        // Verify payload contents
600        let payload = parsed.payload.unwrap();
601        assert_eq!(payload.len(), 16);
602
603        let parsed_consumer_id = u64::from_le_bytes([
604            payload[0], payload[1], payload[2], payload[3], payload[4], payload[5], payload[6],
605            payload[7],
606        ]);
607        let parsed_offset = u64::from_le_bytes([
608            payload[8],
609            payload[9],
610            payload[10],
611            payload[11],
612            payload[12],
613            payload[13],
614            payload[14],
615            payload[15],
616        ]);
617
618        assert_eq!(parsed_consumer_id, consumer_id);
619        assert_eq!(parsed_offset, start_offset);
620    }
621
622    #[test]
623    fn test_commit_ack_frame_roundtrip() {
624        let consumer_id = 0xFEDCBA9876543210u64;
625        let committed_offset = 99999u64;
626
627        let frame = Frame::new_commit_ack(consumer_id, committed_offset);
628
629        let encoded = encode_frame(&frame);
630        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
631
632        assert_eq!(len, encoded.len());
633        assert_eq!(
634            parsed.frame_type,
635            FrameType::Control(ControlCommand::CommitAck)
636        );
637
638        // Verify payload contents
639        let payload = parsed.payload.unwrap();
640        assert_eq!(payload.len(), 16);
641
642        let parsed_consumer_id = u64::from_le_bytes([
643            payload[0], payload[1], payload[2], payload[3], payload[4], payload[5], payload[6],
644            payload[7],
645        ]);
646        let parsed_offset = u64::from_le_bytes([
647            payload[8],
648            payload[9],
649            payload[10],
650            payload[11],
651            payload[12],
652            payload[13],
653            payload[14],
654            payload[15],
655        ]);
656
657        assert_eq!(parsed_consumer_id, consumer_id);
658        assert_eq!(parsed_offset, committed_offset);
659    }
660
661    #[test]
662    fn test_fetch_frame_roundtrip() {
663        let topic_id = 5u32;
664        let start_offset = 1024u64;
665        let max_bytes = 32768u32;
666
667        let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
668
669        let encoded = encode_frame(&frame);
670        let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
671
672        assert_eq!(len, encoded.len());
673        assert_eq!(parsed.frame_type, FrameType::Control(ControlCommand::Fetch));
674
675        // Verify payload contents
676        let payload = parsed.payload.unwrap();
677        assert_eq!(payload.len(), 16);
678
679        let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
680        let parsed_offset = u64::from_le_bytes([
681            payload[4],
682            payload[5],
683            payload[6],
684            payload[7],
685            payload[8],
686            payload[9],
687            payload[10],
688            payload[11],
689        ]);
690        let parsed_max_bytes =
691            u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
692
693        assert_eq!(parsed_topic_id, topic_id);
694        assert_eq!(parsed_offset, start_offset);
695        assert_eq!(parsed_max_bytes, max_bytes);
696    }
697}