1use crate::LWP_HEADER_SIZE;
2use crate::protocol::{ControlCommand, IngestHeader, LwpFlags, LwpHeader};
3use bytes::Bytes;
4use lnc_core::{Result, crc32c};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum FrameType {
8 Ingest,
9 Ack,
10 Backpressure,
11 Keepalive,
12 Control(ControlCommand),
13 Unknown,
14}
15
16pub struct Frame {
17 pub header: LwpHeader,
18 pub payload: Option<Bytes>,
19 pub frame_type: FrameType,
20}
21
22impl Frame {
23 pub fn new_ingest(batch_id: u64, timestamp_ns: u64, record_count: u32, payload: Bytes) -> Self {
24 Self::new_ingest_with_topic(batch_id, timestamp_ns, record_count, payload, 0)
25 }
26
27 pub fn new_ingest_with_topic(
28 batch_id: u64,
29 timestamp_ns: u64,
30 record_count: u32,
31 payload: Bytes,
32 topic_id: u32,
33 ) -> Self {
34 let payload_crc = crc32c(&payload);
35 let ingest = IngestHeader::with_topic(
36 batch_id,
37 timestamp_ns,
38 record_count,
39 payload.len() as u32,
40 payload_crc,
41 topic_id,
42 );
43 let header = LwpHeader::new(LwpFlags::BatchMode as u8, ingest);
44
45 Self {
46 header,
47 payload: Some(payload),
48 frame_type: FrameType::Ingest,
49 }
50 }
51
52 pub fn new_ack(batch_id: u64) -> Self {
53 let ingest = IngestHeader {
54 batch_id,
55 ..Default::default()
56 };
57 let header = LwpHeader::new(LwpFlags::Ack as u8, ingest);
58
59 Self {
60 header,
61 payload: None,
62 frame_type: FrameType::Ack,
63 }
64 }
65
66 pub fn new_keepalive() -> Self {
67 let header = LwpHeader::new(LwpFlags::Keepalive as u8, IngestHeader::default());
68
69 Self {
70 header,
71 payload: None,
72 frame_type: FrameType::Keepalive,
73 }
74 }
75
76 pub fn new_backpressure() -> Self {
77 let header = LwpHeader::new(LwpFlags::Backpressure as u8, IngestHeader::default());
78
79 Self {
80 header,
81 payload: None,
82 frame_type: FrameType::Backpressure,
83 }
84 }
85
86 pub fn new_control(command: ControlCommand, payload: Option<Bytes>) -> Self {
87 let (payload_len, payload_crc) = match &payload {
88 Some(p) => (p.len() as u32, crc32c(p)),
89 None => (0, 0),
90 };
91
92 let ingest = IngestHeader {
93 payload_length: payload_len,
94 payload_crc,
95 batch_id: command as u64,
96 ..Default::default()
97 };
98
99 let header = LwpHeader::new(LwpFlags::Control as u8, ingest);
100
101 Self {
102 header,
103 payload,
104 frame_type: FrameType::Control(command),
105 }
106 }
107
108 pub fn new_create_topic(topic_name: &str) -> Self {
109 Self::new_control(
110 ControlCommand::CreateTopic,
111 Some(Bytes::copy_from_slice(topic_name.as_bytes())),
112 )
113 }
114
115 pub fn new_list_topics() -> Self {
116 Self::new_control(ControlCommand::ListTopics, None)
117 }
118
119 pub fn new_get_topic(topic_id: u32) -> Self {
120 Self::new_control(
121 ControlCommand::GetTopic,
122 Some(Bytes::copy_from_slice(&topic_id.to_le_bytes())),
123 )
124 }
125
126 pub fn new_delete_topic(topic_id: u32) -> Self {
127 Self::new_control(
128 ControlCommand::DeleteTopic,
129 Some(Bytes::copy_from_slice(&topic_id.to_le_bytes())),
130 )
131 }
132
133 pub fn new_set_retention(topic_id: u32, max_age_secs: u64, max_bytes: u64) -> Self {
136 let mut payload = Vec::with_capacity(20);
137 payload.extend_from_slice(&topic_id.to_le_bytes());
138 payload.extend_from_slice(&max_age_secs.to_le_bytes());
139 payload.extend_from_slice(&max_bytes.to_le_bytes());
140 Self::new_control(ControlCommand::SetRetention, Some(Bytes::from(payload)))
141 }
142
143 pub fn new_create_topic_with_retention(
146 topic_name: &str,
147 max_age_secs: u64,
148 max_bytes: u64,
149 ) -> Self {
150 let name_bytes = topic_name.as_bytes();
151 let mut payload = Vec::with_capacity(2 + name_bytes.len() + 16);
152 payload.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
153 payload.extend_from_slice(name_bytes);
154 payload.extend_from_slice(&max_age_secs.to_le_bytes());
155 payload.extend_from_slice(&max_bytes.to_le_bytes());
156 Self::new_control(
157 ControlCommand::CreateTopicWithRetention,
158 Some(Bytes::from(payload)),
159 )
160 }
161
162 pub fn new_fetch(topic_id: u32, start_offset: u64, max_bytes: u32) -> Self {
165 let mut payload = Vec::with_capacity(16);
166 payload.extend_from_slice(&topic_id.to_le_bytes());
167 payload.extend_from_slice(&start_offset.to_le_bytes());
168 payload.extend_from_slice(&max_bytes.to_le_bytes());
169 Self::new_control(ControlCommand::Fetch, Some(Bytes::from(payload)))
170 }
171
172 pub fn new_fetch_response(payload: Bytes) -> Self {
173 Self::new_control(ControlCommand::FetchResponse, Some(payload))
174 }
175
176 pub fn new_get_cluster_status() -> Self {
178 Self::new_control(ControlCommand::GetClusterStatus, None)
179 }
180
181 pub fn new_cluster_status_response(payload: Bytes) -> Self {
183 Self::new_control(ControlCommand::ClusterStatusResponse, Some(payload))
184 }
185
186 pub fn new_authenticate(token: &str) -> Self {
189 Self::new_control(
190 ControlCommand::Authenticate,
191 Some(Bytes::copy_from_slice(token.as_bytes())),
192 )
193 }
194
195 pub fn new_authenticate_response(success: bool, message: Option<&str>) -> Self {
198 let mut payload = Vec::with_capacity(1 + message.map_or(0, |m| m.len()));
199 payload.push(if success { 1 } else { 0 });
200 if let Some(msg) = message {
201 payload.extend_from_slice(msg.as_bytes());
202 }
203 Self::new_control(
204 ControlCommand::AuthenticateResponse,
205 Some(Bytes::from(payload)),
206 )
207 }
208
209 pub fn new_subscribe(
212 topic_id: u32,
213 start_offset: u64,
214 max_batch_bytes: u32,
215 consumer_id: u64,
216 ) -> Self {
217 let mut payload = Vec::with_capacity(24);
218 payload.extend_from_slice(&topic_id.to_le_bytes());
219 payload.extend_from_slice(&start_offset.to_le_bytes());
220 payload.extend_from_slice(&max_batch_bytes.to_le_bytes());
221 payload.extend_from_slice(&consumer_id.to_le_bytes());
222 Self::new_control(ControlCommand::Subscribe, Some(Bytes::from(payload)))
223 }
224
225 pub fn new_unsubscribe(topic_id: u32, consumer_id: u64) -> Self {
228 let mut payload = Vec::with_capacity(12);
229 payload.extend_from_slice(&topic_id.to_le_bytes());
230 payload.extend_from_slice(&consumer_id.to_le_bytes());
231 Self::new_control(ControlCommand::Unsubscribe, Some(Bytes::from(payload)))
232 }
233
234 pub fn new_commit_offset(topic_id: u32, consumer_id: u64, offset: u64) -> Self {
237 let mut payload = Vec::with_capacity(20);
238 payload.extend_from_slice(&topic_id.to_le_bytes());
239 payload.extend_from_slice(&consumer_id.to_le_bytes());
240 payload.extend_from_slice(&offset.to_le_bytes());
241 Self::new_control(ControlCommand::CommitOffset, Some(Bytes::from(payload)))
242 }
243
244 pub fn new_subscribe_ack(consumer_id: u64, start_offset: u64) -> Self {
247 let mut payload = Vec::with_capacity(16);
248 payload.extend_from_slice(&consumer_id.to_le_bytes());
249 payload.extend_from_slice(&start_offset.to_le_bytes());
250 Self::new_control(ControlCommand::SubscribeAck, Some(Bytes::from(payload)))
251 }
252
253 pub fn new_commit_ack(consumer_id: u64, committed_offset: u64) -> Self {
256 let mut payload = Vec::with_capacity(16);
257 payload.extend_from_slice(&consumer_id.to_le_bytes());
258 payload.extend_from_slice(&committed_offset.to_le_bytes());
259 Self::new_control(ControlCommand::CommitAck, Some(Bytes::from(payload)))
260 }
261
262 pub fn new_topic_response(payload: Bytes) -> Self {
263 Self::new_control(ControlCommand::TopicResponse, Some(payload))
264 }
265
266 pub fn new_error_response(message: &str) -> Self {
267 Self::new_control(
268 ControlCommand::ErrorResponse,
269 Some(Bytes::copy_from_slice(message.as_bytes())),
270 )
271 }
272
273 #[inline]
274 #[must_use]
275 pub fn batch_id(&self) -> u64 {
276 self.header.ingest_header.batch_id
277 }
278
279 #[inline]
280 #[must_use]
281 pub fn payload_length(&self) -> u32 {
282 self.header.ingest_header.payload_length
283 }
284
285 #[inline]
286 #[must_use]
287 pub fn record_count(&self) -> u32 {
288 self.header.ingest_header.record_count
289 }
290
291 #[inline]
292 #[must_use]
293 pub fn topic_id(&self) -> u32 {
294 self.header.ingest_header.topic_id
295 }
296}
297
298pub fn parse_frame(buf: &[u8]) -> Result<Option<(Frame, usize)>> {
299 if buf.len() < LWP_HEADER_SIZE {
300 return Ok(None);
301 }
302
303 let header = LwpHeader::parse(buf)?;
304
305 let frame_type = determine_frame_type(&header);
306 let payload_len = header.ingest_header.payload_length as usize;
307 let total_len = LWP_HEADER_SIZE + payload_len;
308
309 if buf.len() < total_len {
310 return Ok(None);
311 }
312
313 let payload = if payload_len > 0 {
314 let payload_bytes = &buf[LWP_HEADER_SIZE..total_len];
315 header.ingest_header.validate_payload(payload_bytes)?;
316 Some(Bytes::copy_from_slice(payload_bytes))
317 } else {
318 None
319 };
320
321 let frame = Frame {
322 header,
323 payload,
324 frame_type,
325 };
326
327 Ok(Some((frame, total_len)))
328}
329
330pub fn encode_frame(frame: &Frame) -> Vec<u8> {
331 let header_bytes = frame.header.encode();
332 let payload_len = frame.payload.as_ref().map_or(0, |p| p.len());
333
334 let mut buf = Vec::with_capacity(LWP_HEADER_SIZE + payload_len);
335 buf.extend_from_slice(&header_bytes);
336
337 if let Some(ref payload) = frame.payload {
338 buf.extend_from_slice(payload);
339 }
340
341 buf
342}
343
344fn determine_frame_type(header: &LwpHeader) -> FrameType {
345 if header.has_flag(LwpFlags::Control) {
346 let command = ControlCommand::from(header.ingest_header.batch_id as u8);
347 FrameType::Control(command)
348 } else if header.has_flag(LwpFlags::Keepalive) {
349 FrameType::Keepalive
350 } else if header.has_flag(LwpFlags::Ack) {
351 FrameType::Ack
352 } else if header.has_flag(LwpFlags::Backpressure) {
353 FrameType::Backpressure
354 } else if header.has_flag(LwpFlags::BatchMode) || header.ingest_header.payload_length > 0 {
355 FrameType::Ingest
356 } else {
357 FrameType::Unknown
358 }
359}
360
361#[cfg(test)]
362#[allow(clippy::unwrap_used)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn test_ingest_frame_roundtrip() {
368 let payload = Bytes::from_static(b"test payload data");
369 let frame = Frame::new_ingest(12345, 1_000_000_000, 1, payload.clone());
370
371 let encoded = encode_frame(&frame);
372 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
373
374 assert_eq!(len, encoded.len());
375 assert_eq!(parsed.frame_type, FrameType::Ingest);
376 assert_eq!(parsed.batch_id(), 12345);
377 assert_eq!(parsed.payload.unwrap(), payload);
378 }
379
380 #[test]
381 fn test_ack_frame_roundtrip() {
382 let frame = Frame::new_ack(54321);
383
384 let encoded = encode_frame(&frame);
385 let (parsed, _) = parse_frame(&encoded).unwrap().unwrap();
386
387 assert_eq!(parsed.frame_type, FrameType::Ack);
388 assert_eq!(parsed.batch_id(), 54321);
389 assert!(parsed.payload.is_none());
390 }
391
392 #[test]
393 fn test_keepalive_frame() {
394 let frame = Frame::new_keepalive();
395
396 let encoded = encode_frame(&frame);
397 let (parsed, _) = parse_frame(&encoded).unwrap().unwrap();
398
399 assert_eq!(parsed.frame_type, FrameType::Keepalive);
400 }
401
402 #[test]
403 fn test_partial_frame() {
404 let payload = Bytes::from_static(b"test");
405 let frame = Frame::new_ingest(1, 0, 1, payload);
406 let encoded = encode_frame(&frame);
407
408 let result = parse_frame(&encoded[..LWP_HEADER_SIZE - 1]).unwrap();
409 assert!(result.is_none());
410
411 let result = parse_frame(&encoded[..LWP_HEADER_SIZE + 2]).unwrap();
412 assert!(result.is_none());
413 }
414
415 #[test]
420 fn test_subscribe_frame_roundtrip() {
421 let topic_id = 42u32;
422 let start_offset = 1000u64;
423 let max_batch_bytes = 65536u32;
424 let consumer_id = 0xDEADBEEF12345678u64;
425
426 let frame = Frame::new_subscribe(topic_id, start_offset, max_batch_bytes, consumer_id);
427
428 let encoded = encode_frame(&frame);
429 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
430
431 assert_eq!(len, encoded.len());
432 assert_eq!(
433 parsed.frame_type,
434 FrameType::Control(ControlCommand::Subscribe)
435 );
436
437 let payload = parsed.payload.unwrap();
439 assert_eq!(payload.len(), 24);
440
441 let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
442 let parsed_offset = u64::from_le_bytes([
443 payload[4],
444 payload[5],
445 payload[6],
446 payload[7],
447 payload[8],
448 payload[9],
449 payload[10],
450 payload[11],
451 ]);
452 let parsed_max_bytes =
453 u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
454 let parsed_consumer_id = u64::from_le_bytes([
455 payload[16],
456 payload[17],
457 payload[18],
458 payload[19],
459 payload[20],
460 payload[21],
461 payload[22],
462 payload[23],
463 ]);
464
465 assert_eq!(parsed_topic_id, topic_id);
466 assert_eq!(parsed_offset, start_offset);
467 assert_eq!(parsed_max_bytes, max_batch_bytes);
468 assert_eq!(parsed_consumer_id, consumer_id);
469 }
470
471 #[test]
472 fn test_unsubscribe_frame_roundtrip() {
473 let topic_id = 7u32;
474 let consumer_id = 0xCAFEBABE00000001u64;
475
476 let frame = Frame::new_unsubscribe(topic_id, consumer_id);
477
478 let encoded = encode_frame(&frame);
479 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
480
481 assert_eq!(len, encoded.len());
482 assert_eq!(
483 parsed.frame_type,
484 FrameType::Control(ControlCommand::Unsubscribe)
485 );
486
487 let payload = parsed.payload.unwrap();
489 assert_eq!(payload.len(), 12);
490
491 let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
492 let parsed_consumer_id = u64::from_le_bytes([
493 payload[4],
494 payload[5],
495 payload[6],
496 payload[7],
497 payload[8],
498 payload[9],
499 payload[10],
500 payload[11],
501 ]);
502
503 assert_eq!(parsed_topic_id, topic_id);
504 assert_eq!(parsed_consumer_id, consumer_id);
505 }
506
507 #[test]
508 fn test_commit_offset_frame_roundtrip() {
509 let topic_id = 99u32;
510 let consumer_id = 0x1234567890ABCDEFu64;
511 let offset = 50000u64;
512
513 let frame = Frame::new_commit_offset(topic_id, consumer_id, offset);
514
515 let encoded = encode_frame(&frame);
516 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
517
518 assert_eq!(len, encoded.len());
519 assert_eq!(
520 parsed.frame_type,
521 FrameType::Control(ControlCommand::CommitOffset)
522 );
523
524 let payload = parsed.payload.unwrap();
526 assert_eq!(payload.len(), 20);
527
528 let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
529 let parsed_consumer_id = u64::from_le_bytes([
530 payload[4],
531 payload[5],
532 payload[6],
533 payload[7],
534 payload[8],
535 payload[9],
536 payload[10],
537 payload[11],
538 ]);
539 let parsed_offset = u64::from_le_bytes([
540 payload[12],
541 payload[13],
542 payload[14],
543 payload[15],
544 payload[16],
545 payload[17],
546 payload[18],
547 payload[19],
548 ]);
549
550 assert_eq!(parsed_topic_id, topic_id);
551 assert_eq!(parsed_consumer_id, consumer_id);
552 assert_eq!(parsed_offset, offset);
553 }
554
555 #[test]
556 fn test_subscribe_ack_frame_roundtrip() {
557 let consumer_id = 0xABCDEF0123456789u64;
558 let start_offset = 12345u64;
559
560 let frame = Frame::new_subscribe_ack(consumer_id, start_offset);
561
562 let encoded = encode_frame(&frame);
563 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
564
565 assert_eq!(len, encoded.len());
566 assert_eq!(
567 parsed.frame_type,
568 FrameType::Control(ControlCommand::SubscribeAck)
569 );
570
571 let payload = parsed.payload.unwrap();
573 assert_eq!(payload.len(), 16);
574
575 let parsed_consumer_id = u64::from_le_bytes([
576 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5], payload[6],
577 payload[7],
578 ]);
579 let parsed_offset = u64::from_le_bytes([
580 payload[8],
581 payload[9],
582 payload[10],
583 payload[11],
584 payload[12],
585 payload[13],
586 payload[14],
587 payload[15],
588 ]);
589
590 assert_eq!(parsed_consumer_id, consumer_id);
591 assert_eq!(parsed_offset, start_offset);
592 }
593
594 #[test]
595 fn test_commit_ack_frame_roundtrip() {
596 let consumer_id = 0xFEDCBA9876543210u64;
597 let committed_offset = 99999u64;
598
599 let frame = Frame::new_commit_ack(consumer_id, committed_offset);
600
601 let encoded = encode_frame(&frame);
602 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
603
604 assert_eq!(len, encoded.len());
605 assert_eq!(
606 parsed.frame_type,
607 FrameType::Control(ControlCommand::CommitAck)
608 );
609
610 let payload = parsed.payload.unwrap();
612 assert_eq!(payload.len(), 16);
613
614 let parsed_consumer_id = u64::from_le_bytes([
615 payload[0], payload[1], payload[2], payload[3], payload[4], payload[5], payload[6],
616 payload[7],
617 ]);
618 let parsed_offset = u64::from_le_bytes([
619 payload[8],
620 payload[9],
621 payload[10],
622 payload[11],
623 payload[12],
624 payload[13],
625 payload[14],
626 payload[15],
627 ]);
628
629 assert_eq!(parsed_consumer_id, consumer_id);
630 assert_eq!(parsed_offset, committed_offset);
631 }
632
633 #[test]
634 fn test_fetch_frame_roundtrip() {
635 let topic_id = 5u32;
636 let start_offset = 1024u64;
637 let max_bytes = 32768u32;
638
639 let frame = Frame::new_fetch(topic_id, start_offset, max_bytes);
640
641 let encoded = encode_frame(&frame);
642 let (parsed, len) = parse_frame(&encoded).unwrap().unwrap();
643
644 assert_eq!(len, encoded.len());
645 assert_eq!(parsed.frame_type, FrameType::Control(ControlCommand::Fetch));
646
647 let payload = parsed.payload.unwrap();
649 assert_eq!(payload.len(), 16);
650
651 let parsed_topic_id = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
652 let parsed_offset = u64::from_le_bytes([
653 payload[4],
654 payload[5],
655 payload[6],
656 payload[7],
657 payload[8],
658 payload[9],
659 payload[10],
660 payload[11],
661 ]);
662 let parsed_max_bytes =
663 u32::from_le_bytes([payload[12], payload[13], payload[14], payload[15]]);
664
665 assert_eq!(parsed_topic_id, topic_id);
666 assert_eq!(parsed_offset, start_offset);
667 assert_eq!(parsed_max_bytes, max_bytes);
668 }
669}