1use lnc_core::{LANCE_MAGIC, LanceError, Result, crc32c};
2use zerocopy::{FromBytes, Immutable, IntoBytes};
3
4pub const PROTOCOL_VERSION: u8 = 1;
5
6#[repr(u8)]
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum LwpFlags {
9 None = 0x00,
10 Compressed = 0x01,
11 Encrypted = 0x02,
12 BatchMode = 0x04,
13 Ack = 0x08,
14 Backpressure = 0x10,
15 Keepalive = 0x20,
16 Control = 0x40,
17}
18
19impl From<u8> for LwpFlags {
20 fn from(value: u8) -> Self {
21 match value {
22 0x01 => Self::Compressed,
23 0x02 => Self::Encrypted,
24 0x04 => Self::BatchMode,
25 0x08 => Self::Ack,
26 0x10 => Self::Backpressure,
27 0x20 => Self::Keepalive,
28 0x40 => Self::Control,
29 _ => Self::None,
30 }
31 }
32}
33
34#[repr(u8)]
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum ControlCommand {
37 CreateTopic = 0x01,
39 DeleteTopic = 0x02,
40 ListTopics = 0x03,
41 GetTopic = 0x04,
42 SetRetention = 0x05,
44 CreateTopicWithRetention = 0x06,
46
47 GetClusterStatus = 0x07,
50 Authenticate = 0x08,
52 AuthenticateResponse = 0x09,
54
55 Fetch = 0x10,
57 FetchResponse = 0x11,
58 CatchingUp = 0x12,
60
61 Subscribe = 0x20,
64 Unsubscribe = 0x21,
66 CommitOffset = 0x22,
68 SubscribeAck = 0x23,
70 CommitAck = 0x24,
72
73 TopicResponse = 0x80,
75 ClusterStatusResponse = 0x81,
77 ErrorResponse = 0xFF,
78}
79
80impl From<u8> for ControlCommand {
81 fn from(value: u8) -> Self {
82 match value {
83 0x01 => Self::CreateTopic,
84 0x02 => Self::DeleteTopic,
85 0x03 => Self::ListTopics,
86 0x04 => Self::GetTopic,
87 0x05 => Self::SetRetention,
88 0x06 => Self::CreateTopicWithRetention,
89 0x07 => Self::GetClusterStatus,
90 0x08 => Self::Authenticate,
91 0x09 => Self::AuthenticateResponse,
92 0x10 => Self::Fetch,
93 0x11 => Self::FetchResponse,
94 0x12 => Self::CatchingUp,
95 0x20 => Self::Subscribe,
96 0x21 => Self::Unsubscribe,
97 0x22 => Self::CommitOffset,
98 0x23 => Self::SubscribeAck,
99 0x24 => Self::CommitAck,
100 0x80 => Self::TopicResponse,
101 0x81 => Self::ClusterStatusResponse,
102 0xFF => Self::ErrorResponse,
103 _ => Self::ErrorResponse,
104 }
105 }
106}
107
108#[derive(Debug, Clone, Copy)]
109pub struct LwpHeader {
110 pub magic: [u8; 4],
111 pub version: u8,
112 pub flags: u8,
113 pub reserved: [u8; 2],
114 pub header_crc: u32,
115 pub ingest_header: IngestHeader,
116}
117
118impl LwpHeader {
119 pub const SIZE: usize = 44;
120
121 pub fn new(flags: u8, ingest_header: IngestHeader) -> Self {
122 let mut header = Self {
123 magic: LANCE_MAGIC,
124 version: PROTOCOL_VERSION,
125 flags,
126 reserved: [0; 2],
127 header_crc: 0,
128 ingest_header,
129 };
130
131 let mut crc_buf = [0u8; 8];
132 crc_buf[0..4].copy_from_slice(&header.magic);
133 crc_buf[4] = header.version;
134 crc_buf[5] = header.flags;
135 crc_buf[6..8].copy_from_slice(&header.reserved);
136 header.header_crc = crc32c(&crc_buf);
137
138 header
139 }
140
141 pub fn parse(buf: &[u8]) -> Result<Self> {
142 if buf.len() < Self::SIZE {
143 return Err(LanceError::Protocol(
144 "Buffer too small for LWP header".into(),
145 ));
146 }
147
148 if buf[0..4] != LANCE_MAGIC {
149 return Err(LanceError::InvalidMagic);
150 }
151
152 let version = buf[4];
153 if version != PROTOCOL_VERSION {
154 return Err(LanceError::Protocol(format!(
155 "Unsupported protocol version: {}",
156 version
157 )));
158 }
159
160 let stored_crc = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
161 let computed_crc = crc32c(&buf[0..8]);
162
163 if stored_crc != computed_crc {
164 return Err(LanceError::CrcMismatch {
165 expected: stored_crc,
166 actual: computed_crc,
167 });
168 }
169
170 let mut magic = [0u8; 4];
171 magic.copy_from_slice(&buf[0..4]);
172
173 let ingest_header = IngestHeader::parse(&buf[12..Self::SIZE])?;
174
175 Ok(Self {
176 magic,
177 version: buf[4],
178 flags: buf[5],
179 reserved: [buf[6], buf[7]],
180 header_crc: stored_crc,
181 ingest_header,
182 })
183 }
184
185 pub fn encode(&self) -> [u8; Self::SIZE] {
186 let mut buf = [0u8; Self::SIZE];
187 buf[0..4].copy_from_slice(&self.magic);
188 buf[4] = self.version;
189 buf[5] = self.flags;
190 buf[6..8].copy_from_slice(&self.reserved);
191 buf[8..12].copy_from_slice(&self.header_crc.to_le_bytes());
192 self.ingest_header.encode_into(&mut buf[12..Self::SIZE]);
193 buf
194 }
195
196 #[inline]
197 pub fn has_flag(&self, flag: LwpFlags) -> bool {
198 self.flags & (flag as u8) != 0
199 }
200
201 #[inline]
202 pub fn is_keepalive(&self) -> bool {
203 self.has_flag(LwpFlags::Keepalive)
204 }
205
206 #[inline]
207 pub fn is_batch_mode(&self) -> bool {
208 self.has_flag(LwpFlags::BatchMode)
209 }
210
211 #[inline]
212 pub fn is_compressed(&self) -> bool {
213 self.has_flag(LwpFlags::Compressed)
214 }
215
216 #[inline]
217 pub fn is_control(&self) -> bool {
218 self.has_flag(LwpFlags::Control)
219 }
220
221 pub fn keepalive() -> Self {
223 Self::new(LwpFlags::Keepalive as u8, IngestHeader::default())
224 }
225}
226
227#[repr(C)]
233#[derive(Debug, Clone, Copy, Default, FromBytes, Immutable, IntoBytes)]
234pub struct IngestHeader {
235 pub batch_id: u64,
236 pub timestamp_ns: u64,
237 pub record_count: u32,
238 pub payload_length: u32,
239 pub payload_crc: u32,
240 pub topic_id: u32,
241}
242
243impl IngestHeader {
244 pub const SIZE: usize = 32;
245
246 pub fn new(
247 batch_id: u64,
248 timestamp_ns: u64,
249 record_count: u32,
250 payload_length: u32,
251 payload_crc: u32,
252 ) -> Self {
253 Self {
254 batch_id,
255 timestamp_ns,
256 record_count,
257 payload_length,
258 payload_crc,
259 topic_id: 0,
260 }
261 }
262
263 pub fn with_topic(
264 batch_id: u64,
265 timestamp_ns: u64,
266 record_count: u32,
267 payload_length: u32,
268 payload_crc: u32,
269 topic_id: u32,
270 ) -> Self {
271 Self {
272 batch_id,
273 timestamp_ns,
274 record_count,
275 payload_length,
276 payload_crc,
277 topic_id,
278 }
279 }
280
281 pub fn parse(buf: &[u8]) -> Result<Self> {
287 if buf.len() < Self::SIZE {
288 return Err(LanceError::Protocol(
289 "Buffer too small for IngestHeader".into(),
290 ));
291 }
292
293 #[cfg(target_endian = "little")]
294 {
295 if let Ok(header) = Self::read_from_bytes(&buf[..Self::SIZE]) {
299 return Ok(header);
300 }
301 }
302
303 Ok(Self {
305 batch_id: u64::from_le_bytes([
306 buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
307 ]),
308 timestamp_ns: u64::from_le_bytes([
309 buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
310 ]),
311 record_count: u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]),
312 payload_length: u32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]),
313 payload_crc: u32::from_le_bytes([buf[24], buf[25], buf[26], buf[27]]),
314 topic_id: u32::from_le_bytes([buf[28], buf[29], buf[30], buf[31]]),
315 })
316 }
317
318 pub fn encode_into(&self, buf: &mut [u8]) {
322 #[cfg(target_endian = "little")]
323 {
324 let src = self.as_bytes();
325 buf[..Self::SIZE].copy_from_slice(src);
326 }
327
328 #[cfg(target_endian = "big")]
329 {
330 buf[0..8].copy_from_slice(&self.batch_id.to_le_bytes());
331 buf[8..16].copy_from_slice(&self.timestamp_ns.to_le_bytes());
332 buf[16..20].copy_from_slice(&self.record_count.to_le_bytes());
333 buf[20..24].copy_from_slice(&self.payload_length.to_le_bytes());
334 buf[24..28].copy_from_slice(&self.payload_crc.to_le_bytes());
335 buf[28..32].copy_from_slice(&self.topic_id.to_le_bytes());
336 }
337 }
338
339 pub fn validate_payload(&self, payload: &[u8]) -> Result<()> {
340 if payload.len() != self.payload_length as usize {
341 return Err(LanceError::Protocol(format!(
342 "Payload length mismatch: expected {}, got {}",
343 self.payload_length,
344 payload.len()
345 )));
346 }
347
348 let actual_crc = crc32c(payload);
349 if actual_crc != self.payload_crc {
350 lnc_metrics::increment_crc_failures();
351 return Err(LanceError::CrcMismatch {
352 expected: self.payload_crc,
353 actual: actual_crc,
354 });
355 }
356
357 Ok(())
358 }
359}
360
361#[cfg(test)]
362#[allow(clippy::unwrap_used)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn test_lwp_header_roundtrip() {
368 let ingest = IngestHeader::new(12345, 1_000_000_000, 100, 4096, 0xDEADBEEF);
369 let header = LwpHeader::new(LwpFlags::BatchMode as u8, ingest);
370
371 let encoded = header.encode();
372 let parsed = LwpHeader::parse(&encoded).unwrap();
373
374 assert_eq!(parsed.magic, LANCE_MAGIC);
375 assert_eq!(parsed.version, PROTOCOL_VERSION);
376 assert_eq!(parsed.ingest_header.batch_id, 12345);
377 assert_eq!(parsed.ingest_header.record_count, 100);
378 }
379
380 #[test]
381 fn test_invalid_magic() {
382 let mut buf = [0u8; LwpHeader::SIZE];
383 buf[0..4].copy_from_slice(b"JUNK");
384
385 let result = LwpHeader::parse(&buf);
386 assert!(matches!(result, Err(LanceError::InvalidMagic)));
387 }
388
389 #[test]
390 fn test_keepalive() {
391 let header = LwpHeader::keepalive();
392 assert!(header.is_keepalive());
393 assert!(!header.is_batch_mode());
394 }
395
396 #[test]
397 fn test_payload_validation() {
398 let payload = b"test payload data";
399 let crc = crc32c(payload);
400
401 let ingest = IngestHeader::new(1, 0, 1, payload.len() as u32, crc);
402 assert!(ingest.validate_payload(payload).is_ok());
403
404 let wrong_crc_ingest = IngestHeader::new(1, 0, 1, payload.len() as u32, 0xBADBAD);
405 assert!(wrong_crc_ingest.validate_payload(payload).is_err());
406 }
407
408 #[test]
413 fn test_control_command_topic_management() {
414 assert_eq!(ControlCommand::from(0x01), ControlCommand::CreateTopic);
415 assert_eq!(ControlCommand::from(0x02), ControlCommand::DeleteTopic);
416 assert_eq!(ControlCommand::from(0x03), ControlCommand::ListTopics);
417 assert_eq!(ControlCommand::from(0x04), ControlCommand::GetTopic);
418 }
419
420 #[test]
421 fn test_control_command_fetch() {
422 assert_eq!(ControlCommand::from(0x10), ControlCommand::Fetch);
423 assert_eq!(ControlCommand::from(0x11), ControlCommand::FetchResponse);
424 }
425
426 #[test]
427 fn test_control_command_streaming() {
428 assert_eq!(ControlCommand::from(0x20), ControlCommand::Subscribe);
429 assert_eq!(ControlCommand::from(0x21), ControlCommand::Unsubscribe);
430 assert_eq!(ControlCommand::from(0x22), ControlCommand::CommitOffset);
431 assert_eq!(ControlCommand::from(0x23), ControlCommand::SubscribeAck);
432 assert_eq!(ControlCommand::from(0x24), ControlCommand::CommitAck);
433 }
434
435 #[test]
436 fn test_control_command_responses() {
437 assert_eq!(ControlCommand::from(0x80), ControlCommand::TopicResponse);
438 assert_eq!(ControlCommand::from(0xFF), ControlCommand::ErrorResponse);
439 }
440
441 #[test]
442 fn test_control_command_authentication() {
443 assert_eq!(ControlCommand::from(0x08), ControlCommand::Authenticate);
444 assert_eq!(
445 ControlCommand::from(0x09),
446 ControlCommand::AuthenticateResponse
447 );
448 assert_eq!(ControlCommand::Authenticate as u8, 0x08);
449 assert_eq!(ControlCommand::AuthenticateResponse as u8, 0x09);
450 }
451
452 #[test]
453 fn test_control_command_unknown_defaults_to_error() {
454 assert_eq!(ControlCommand::from(0x00), ControlCommand::ErrorResponse);
456 assert_eq!(ControlCommand::from(0x99), ControlCommand::ErrorResponse);
457 assert_eq!(ControlCommand::from(0xFE), ControlCommand::ErrorResponse);
458 }
459
460 #[test]
461 fn test_control_command_roundtrip() {
462 assert_eq!(ControlCommand::Subscribe as u8, 0x20);
464 assert_eq!(ControlCommand::Unsubscribe as u8, 0x21);
465 assert_eq!(ControlCommand::CommitOffset as u8, 0x22);
466 assert_eq!(ControlCommand::SubscribeAck as u8, 0x23);
467 assert_eq!(ControlCommand::CommitAck as u8, 0x24);
468 }
469}