1use crate::LWP_HEADER_SIZE;
2use crate::protocol::{ControlCommand, IngestHeader, LwpFlags, LwpHeader};
3use bytes::Bytes;
4use lnc_core::{Result, crc32c};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum FrameType {
8 Ingest,
9 Ack,
10 Backpressure,
11 Keepalive,
12 Control(ControlCommand),
13 Unknown,
14}
15
16pub struct Frame {
17 pub header: LwpHeader,
18 pub payload: Option<Bytes>,
19 pub frame_type: FrameType,
20}
21
22impl Frame {
23 pub fn new_ingest(batch_id: u64, timestamp_ns: u64, record_count: u32, payload: Bytes) -> Self {
24 Self::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, 0)
25 }
26
27 pub fn new_ingest_with_topic(
28 batch_id: u64,
29 timestamp_ns: u64,
30 record_count: u32,
31 payload: Bytes,
32 topic_id: u32,
33 ) -> Self {
34 let payload_crc = crc32c(&payload);
35 let ingest = IngestHeader::with_topic(
36 batch_id,
37 timestamp_ns,
38 record_count,
39 payload.len() as u32,
40 payload_crc,
41 topic_id,
42 );
43 let header = LwpHeader::new(LwpFlags::BatchMode as u8, ingest);
44
45 Self {
46 header,
47 payload: Some(payload),
48 frame_type: FrameType::Ingest,
49 }
50 }
51
52 pub fn new_ack(batch_id: u64) -> Self {
53 let ingest = IngestHeader {
54 batch_id,
55 ..Default::default()
56 };
57 let header = LwpHeader::new(LwpFlags::Ack as u8, ingest);
58
59 Self {
60 header,
61 payload: None,
62 frame_type: FrameType::Ack,
63 }
64 }
65
66 pub fn new_keepalive() -> Self {
67 let header = LwpHeader::new(LwpFlags::Keepalive as u8, IngestHeader::default());
68
69 Self {
70 header,
71 payload: None,
72 frame_type: FrameType::Keepalive,
73 }
74 }
75
76 pub fn new_backpressure() -> Self {
77 let header = LwpHeader::new(LwpFlags::Backpressure as u8, IngestHeader::default());
78
79 Self {
80 header,
81 payload: None,
82 frame_type: FrameType::Backpressure,
83 }
84 }
85
86 pub fn new_control(command: ControlCommand, payload: Option<Bytes>) -> Self {
87 let (payload_len, payload_crc) = match &payload {
88 Some(p) => (p.len() as u32, crc32c(p)),
89 None => (0, 0),
90 };
91
92 let ingest = IngestHeader {
93 payload_length: payload_len,
94 payload_crc,
95 batch_id: command as u64,
96 ..Default::default()
97 };
98
99 let header = LwpHeader::new(LwpFlags::Control as u8, ingest);
100
101 Self {
102 header,
103 payload,
104 frame_type: FrameType::Control(command),
105 }
106 }
107
108 pub fn new_create_topic(topic_name: &str) -> Self {
109 Self::new_control(
110 ControlCommand::CreateTopic,
111 Some(Bytes::copy_from_slice(topic_name.as_bytes())),
112 )
113 }
114
115 pub fn new_list_topics() -> Self {
116 Self::new_control(ControlCommand::ListTopics, None)
117 }
118
119 pub fn new_get_topic(topic_id: u32) -> Self {
120 Self::new_control(
121 ControlCommand::GetTopic,
122 Some(Bytes::copy_from_slice(&topic_id.to_le_bytes())),
123 )
124 }
125
126 pub fn new_delete_topic(topic_id: u32) -> Self {
127 Self::new_control(
128 ControlCommand::DeleteTopic,
129 Some(Bytes::copy_from_slice(&topic_id.to_le_bytes())),
130 )
131 }
132
133 pub fn new_set_retention(topic_id: u32, max_age_secs: u64, max_bytes: u64) -> Self {
136 let mut payload = Vec::with_capacity(20);
137 payload.extend_from_slice(&topic_id.to_le_bytes());
138 payload.extend_from_slice(&max_age_secs.to_le_bytes());
139 payload.extend_from_slice(&max_bytes.to_le_bytes());
140 Self::new_control(ControlCommand::SetRetention, Some(Bytes::from(payload)))
141 }
142
143 pub fn new_create_topic_with_retention(
146 topic_name: &str,
147 max_age_secs: u64,
148 max_bytes: u64,
149 ) -> Self {
150 let name_bytes = topic_name.as_bytes();
151 let mut payload = Vec::with_capacity(2 + name_bytes.len() + 16);
152 payload.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
153 payload.extend_from_slice(name_bytes);
154 payload.extend_from_slice(&max_age_secs.to_le_bytes());
155 payload.extend_from_slice(&max_bytes.to_le_bytes());
156 Self::new_control(
157 ControlCommand::CreateTopicWithRetention,
158 Some(Bytes::from(payload)),
159 )
160 }
161
162 pub fn new_fetch(topic_id: u32, start_offset: u64, max_bytes: u32) -> Self {
165 let mut payload = Vec::with_capacity(16);
166 payload.extend_from_slice(&topic_id.to_le_bytes());
167 payload.extend_from_slice(&start_offset.to_le_bytes());
168 payload.extend_from_slice(&max_bytes.to_le_bytes());
169 Self::new_control(ControlCommand::Fetch, Some(Bytes::from(payload)))
170 }
171
172 pub fn new_fetch_response(payload: Bytes) -> Self {
173 Self::new_control(ControlCommand::FetchResponse, Some(payload))
174 }
175
176 pub fn new_catching_up(current_max_offset: u64) -> Self {
180 Self::new_control(
181 ControlCommand::CatchingUp,
182 Some(Bytes::copy_from_slice(¤t_max_offset.to_le_bytes())),
183 )
184 }
185
186 pub fn new_get_cluster_status() -> Self {
188 Self::new_control(ControlCommand::GetClusterStatus, None)
189 }
190
191 pub fn new_cluster_status_response(payload: Bytes) -> Self {
193 Self::new_control(ControlCommand::ClusterStatusResponse, Some(payload))
194 }
195
196 pub fn new_authenticate(token: &str) -> Self {
199 Self::new_control(
200 ControlCommand::Authenticate,
201 Some(Bytes::copy_from_slice(token.as_bytes())),
202 )
203 }
204
205 pub fn new_authenticate_response(success: bool, message: Option<&str>) -> Self {
208 let mut payload = Vec::with_capacity(1 + message.map_or(0, |m| m.len()));
209 payload.push(if success { 1 } else { 0 });
210 if let Some(msg) = message {
211 payload.extend_from_slice(msg.as_bytes());
212 }
213 Self::new_control(
214 ControlCommand::AuthenticateResponse,
215 Some(Bytes::from(payload)),
216 )
217 }
218
219 pub fn new_subscribe(
222 topic_id: u32,
223 start_offset: u64,
224 max_batch_bytes: u32,
225 consumer_id: u64,
226 ) -> Self {
227 let mut payload = Vec::with_capacity(24);
228 payload.extend_from_slice(&topic_id.to_le_bytes());
229 payload.extend_from_slice(&start_offset.to_le_bytes());
230 payload.extend_from_slice(&max_batch_bytes.to_le_bytes());
231 payload.extend_from_slice(&consumer_id.to_le_bytes());
232 Self::new_control(ControlCommand::Subscribe, Some(Bytes::from(payload)))
233 }
234
235 pub fn new_unsubscribe(topic_id: u32, consumer_id: u64) -> Self {
238 let mut payload = Vec::with_capacity(12);
239 payload.extend_from_slice(&topic_id.to_le_bytes());
240 payload.extend_from_slice(&consumer_id.to_le_bytes());
241 Self::new_control(ControlCommand::Unsubscribe, Some(Bytes::from(payload)))
242 }
243
244 pub fn new_commit_offset(topic_id: u32, consumer_id: u64, offset: u64) -> Self {
247 let mut payload = Vec::with_capacity(20);
248 payload.extend_from_slice(&topic_id.to_le_bytes());
249 payload.extend_from_slice(&consumer_id.to_le_bytes());
250 payload.extend_from_slice(&offset.to_le_bytes());
251 Self::new_control(ControlCommand::CommitOffset, Some(Bytes::from(payload)))
252 }
253
254 pub fn new_subscribe_ack(consumer_id: u64, start_offset: u64) -> Self {
257 let mut payload = Vec::with_capacity(16);
258 payload.extend_from_slice(&consumer_id.to_le_bytes());
259 payload.extend_from_slice(&start_offset.to_le_bytes());
260 Self::new_control(ControlCommand::SubscribeAck, Some(Bytes::from(payload)))
261 }
262
263 pub fn new_commit_ack(consumer_id: u64, committed_offset: u64) -> Self {
266 let mut payload = Vec::with_capacity(16);
267 payload.extend_from_slice(&consumer_id.to_le_bytes());
268 payload.extend_from_slice(&committed_offset.to_le_bytes());
269 Self::new_control(ControlCommand::CommitAck, Some(Bytes::from(payload)))
270 }
271
272 pub fn new_topic_response(payload: Bytes) -> Self {
273 Self::new_control(ControlCommand::TopicResponse, Some(payload))
274 }
275
276 pub fn new_error_response(message: &str) -> Self {
277 Self::new_control(
278 ControlCommand::ErrorResponse,
279 Some(Bytes::copy_from_slice(message.as_bytes())),
280 )
281 }
282
283 #[inline]
284 #[must_use]
285 pub fn batch_id(&self) -> u64 {
286 self.header.ingest_header.batch_id
287 }
288
289 #[inline]
290 #[must_use]
291 pub fn payload_length(&self) -> u32 {
292 self.header.ingest_header.payload_length
293 }
294
295 #[inline]
296 #[must_use]
297 pub fn record_count(&self) -> u32 {
298 self.header.ingest_header.record_count
299 }
300
301 #[inline]
302 #[must_use]
303 pub fn topic_id(&self) -> u32 {
304 self.header.ingest_header.topic_id
305 }
306}
307
308pub fn parse_frame(buf: &[u8]) -> Result<Option<(Frame, usize)>> {
309 if buf.len() < LWP_HEADER_SIZE {
310 return Ok(None);
311 }
312
313 let header = LwpHeader::parse(buf)?;
314
315 let frame_type = determine_frame_type(&header);
316 let payload_len = header.ingest_header.payload_length as usize;
317 let total_len = LWP_HEADER_SIZE + payload_len;
318
319 if buf.len() < total_len {
320 return Ok(None);
321 }
322
323 let payload = if payload_len > 0 {
324 let payload_bytes = &buf[LWP_HEADER_SIZE..total_len];
325 header.ingest_header.validate_payload(payload_bytes)?;
326 Some(Bytes::copy_from_slice(payload_bytes))
327 } else {
328 None
329 };
330
331 let frame = Frame {
332 header,
333 payload,
334 frame_type,
335 };
336
337 Ok(Some((frame, total_len)))
338}
339
340#[inline]
343pub fn encode_ack_bytes(batch_id: u64) -> [u8; crate::LWP_HEADER_SIZE] {
344 let frame = Frame::new_ack(batch_id);
345 frame.header.encode()
346}
347
348#[inline]
353pub fn encode_backpressure_bytes() -> [u8; crate::LWP_HEADER_SIZE] {
354 let frame = Frame::new_backpressure();
355 frame.header.encode()
356}
357
358pub fn encode_frame(frame: &Frame) -> Vec<u8> {
359 let header_bytes = frame.header.encode();
360 let payload_len = frame.payload.as_ref().map_or(0, |p| p.len());
361
362 let mut buf = Vec::with_capacity(LWP_HEADER_SIZE + payload_len);
363 buf.extend_from_slice(&header_bytes);
364
365 if let Some(ref payload) = frame.payload {
366 buf.extend_from_slice(payload);
367 }
368
369 buf
370}
371
372fn determine_frame_type(header: &LwpHeader) -> FrameType {
373 if header.has_flag(LwpFlags::Control) {
374 let command = ControlCommand::from(header.ingest_header.batch_id as u8);
375 FrameType::Control(command)
376 } else if header.has_flag(LwpFlags::Keepalive) {
377 FrameType::Keepalive
378 } else if header.has_flag(LwpFlags::Ack) {
379 FrameType::Ack
380 } else if header.has_flag(LwpFlags::Backpressure) {
381 FrameType::Backpressure
382 } else if header.has_flag(LwpFlags::BatchMode) || header.ingest_header.payload_length > 0 {
383 FrameType::Ingest
384 } else {
385 FrameType::Unknown
386 }
387}
388
389#[cfg(test)]
390#[allow(clippy::unwrap_used)]
391mod tests {
392 use super::*;
393
394 #[test]
395 fn test_ingest_frame_roundtrip() {
396 let payload = Bytes::from_static(b"test payload data");
397 let frame = Frame::new_ingest(12345, 1_000_000_000, 1, payload.clone());
398
399 let encoded = encode_frame(&frame);
400 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
401
402 assert_eq!(len, encoded.len());
403 assert_eq!(parsed.frame_type, FrameType::Ingest);
404 assert_eq!(parsed.batch_id(), 12345);
405 assert_eq!(parsed.payload.unwrap(), payload);
406 }
407
408 #[test]
409 fn test_ack_frame_roundtrip() {
410 let frame = Frame::new_ack(54321);
411
412 let encoded = encode_frame(&frame);
413 let (parsed, _) = parse_frame(&encoded).unwrap().unwrap();
414
415 assert_eq!(parsed.frame_type, FrameType::Ack);
416 assert_eq!(parsed.batch_id(), 54321);
417 assert!(parsed.payload.is_none());
418 }
419
420 #[test]
421 fn test_keepalive_frame() {
422 let frame = Frame::new_keepalive();
423
424 let encoded = encode_frame(&frame);
425 let (parsed, _) = parse_frame(&encoded).unwrap().unwrap();
426
427 assert_eq!(parsed.frame_type, FrameType::Keepalive);
428 }
429
430 #[test]
431 fn test_partial_frame() {
432 let payload = Bytes::from_static(b"test");
433 let frame = Frame::new_ingest(1, 0, 1, payload);
434 let encoded = encode_frame(&frame);
435
436 let result = parse_frame(&encoded[..LWP_HEADER_SIZE - 1]).unwrap();
437 assert!(result.is_none());
438
439 let result = parse_frame(&encoded[..LWP_HEADER_SIZE + 2]).unwrap();
440 assert!(result.is_none());
441 }
442
443 #[test]
448 fn test_subscribe_frame_roundtrip() {
449 let topic_id = 42u32;
450 let start_offset = 1000u64;
451 let max_batch_bytes = 65536u32;
452 let consumer_id = 0xDEADBEEF12345678u64;
453
454 let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
455
456 let encoded = encode_frame(&frame);
457 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
458
459 assert_eq!(len, encoded.len());
460 assert_eq!(
461 parsed.frame_type,
462 FrameType::Control(ControlCommand::Subscribe)
463 );
464
465 let payload = parsed.payload.unwrap();
467 assert_eq!(payload.len(), 24);
468
469 let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
470 let parsed_offset = u64::from_le_bytes([
471 payload[4],
472 payload[5],
473 payload[6],
474 payload[7],
475 payload[8],
476 payload[9],
477 payload[10],
478 payload[11],
479 ]);
480 let parsed_max_bytes =
481 u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
482 let parsed_consumer_id = u64::from_le_bytes([
483 payload[16],
484 payload[17],
485 payload[18],
486 payload[19],
487 payload[20],
488 payload[21],
489 payload[22],
490 payload[23],
491 ]);
492
493 assert_eq!(parsed_topic_id, topic_id);
494 assert_eq!(parsed_offset, start_offset);
495 assert_eq!(parsed_max_bytes, max_batch_bytes);
496 assert_eq!(parsed_consumer_id, consumer_id);
497 }
498
499 #[test]
500 fn test_unsubscribe_frame_roundtrip() {
501 let topic_id = 7u32;
502 let consumer_id = 0xCAFEBABE00000001u64;
503
504 let frame = Frame::new_unsubscribe(topic_id, consumer_id);
505
506 let encoded = encode_frame(&frame);
507 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
508
509 assert_eq!(len, encoded.len());
510 assert_eq!(
511 parsed.frame_type,
512 FrameType::Control(ControlCommand::Unsubscribe)
513 );
514
515 let payload = parsed.payload.unwrap();
517 assert_eq!(payload.len(), 12);
518
519 let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
520 let parsed_consumer_id = u64::from_le_bytes([
521 payload[4],
522 payload[5],
523 payload[6],
524 payload[7],
525 payload[8],
526 payload[9],
527 payload[10],
528 payload[11],
529 ]);
530
531 assert_eq!(parsed_topic_id, topic_id);
532 assert_eq!(parsed_consumer_id, consumer_id);
533 }
534
535 #[test]
536 fn test_commit_offset_frame_roundtrip() {
537 let topic_id = 99u32;
538 let consumer_id = 0x1234567890ABCDEFu64;
539 let offset = 50000u64;
540
541 let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
542
543 let encoded = encode_frame(&frame);
544 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
545
546 assert_eq!(len, encoded.len());
547 assert_eq!(
548 parsed.frame_type,
549 FrameType::Control(ControlCommand::CommitOffset)
550 );
551
552 let payload = parsed.payload.unwrap();
554 assert_eq!(payload.len(), 20);
555
556 let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
557 let parsed_consumer_id = u64::from_le_bytes([
558 payload[4],
559 payload[5],
560 payload[6],
561 payload[7],
562 payload[8],
563 payload[9],
564 payload[10],
565 payload[11],
566 ]);
567 let parsed_offset = u64::from_le_bytes([
568 payload[12],
569 payload[13],
570 payload[14],
571 payload[15],
572 payload[16],
573 payload[17],
574 payload[18],
575 payload[19],
576 ]);
577
578 assert_eq!(parsed_topic_id, topic_id);
579 assert_eq!(parsed_consumer_id, consumer_id);
580 assert_eq!(parsed_offset, offset);
581 }
582
583 #[test]
584 fn test_subscribe_ack_frame_roundtrip() {
585 let consumer_id = 0xABCDEF0123456789u64;
586 let start_offset = 12345u64;
587
588 let frame = Frame::new_subscribe_ack(consumer_id, start_offset);
589
590 let encoded = encode_frame(&frame);
591 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
592
593 assert_eq!(len, encoded.len());
594 assert_eq!(
595 parsed.frame_type,
596 FrameType::Control(ControlCommand::SubscribeAck)
597 );
598
599 let payload = parsed.payload.unwrap();
601 assert_eq!(payload.len(), 16);
602
603 let parsed_consumer_id = u64::from_le_bytes([
604 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5], payload[6],
605 payload[7],
606 ]);
607 let parsed_offset = u64::from_le_bytes([
608 payload[8],
609 payload[9],
610 payload[10],
611 payload[11],
612 payload[12],
613 payload[13],
614 payload[14],
615 payload[15],
616 ]);
617
618 assert_eq!(parsed_consumer_id, consumer_id);
619 assert_eq!(parsed_offset, start_offset);
620 }
621
622 #[test]
623 fn test_commit_ack_frame_roundtrip() {
624 let consumer_id = 0xFEDCBA9876543210u64;
625 let committed_offset = 99999u64;
626
627 let frame = Frame::new_commit_ack(consumer_id, committed_offset);
628
629 let encoded = encode_frame(&frame);
630 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
631
632 assert_eq!(len, encoded.len());
633 assert_eq!(
634 parsed.frame_type,
635 FrameType::Control(ControlCommand::CommitAck)
636 );
637
638 let payload = parsed.payload.unwrap();
640 assert_eq!(payload.len(), 16);
641
642 let parsed_consumer_id = u64::from_le_bytes([
643 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5], payload[6],
644 payload[7],
645 ]);
646 let parsed_offset = u64::from_le_bytes([
647 payload[8],
648 payload[9],
649 payload[10],
650 payload[11],
651 payload[12],
652 payload[13],
653 payload[14],
654 payload[15],
655 ]);
656
657 assert_eq!(parsed_consumer_id, consumer_id);
658 assert_eq!(parsed_offset, committed_offset);
659 }
660
661 #[test]
662 fn test_fetch_frame_roundtrip() {
663 let topic_id = 5u32;
664 let start_offset = 1024u64;
665 let max_bytes = 32768u32;
666
667 let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
668
669 let encoded = encode_frame(&frame);
670 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
671
672 assert_eq!(len, encoded.len());
673 assert_eq!(parsed.frame_type, FrameType::Control(ControlCommand::Fetch));
674
675 let payload = parsed.payload.unwrap();
677 assert_eq!(payload.len(), 16);
678
679 let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
680 let parsed_offset = u64::from_le_bytes([
681 payload[4],
682 payload[5],
683 payload[6],
684 payload[7],
685 payload[8],
686 payload[9],
687 payload[10],
688 payload[11],
689 ]);
690 let parsed_max_bytes =
691 u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
692
693 assert_eq!(parsed_topic_id, topic_id);
694 assert_eq!(parsed_offset, start_offset);
695 assert_eq!(parsed_max_bytes, max_bytes);
696 }
697}