1use lnc_core::{LANCE_MAGIC, LanceError, Result, crc32c};
2
3pub const PROTOCOL_VERSION: u8 = 1;
4
5#[repr(u8)]
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum LwpFlags {
8 None = 0x00,
9 Compressed = 0x01,
10 Encrypted = 0x02,
11 BatchMode = 0x04,
12 Ack = 0x08,
13 Backpressure = 0x10,
14 Keepalive = 0x20,
15 Control = 0x40,
16}
17
18impl From<u8> for LwpFlags {
19 fn from(value: u8) -> Self {
20 match value {
21 0x01 => Self::Compressed,
22 0x02 => Self::Encrypted,
23 0x04 => Self::BatchMode,
24 0x08 => Self::Ack,
25 0x10 => Self::Backpressure,
26 0x20 => Self::Keepalive,
27 0x40 => Self::Control,
28 _ => Self::None,
29 }
30 }
31}
32
33#[repr(u8)]
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum ControlCommand {
36 CreateTopic = 0x01,
38 DeleteTopic = 0x02,
39 ListTopics = 0x03,
40 GetTopic = 0x04,
41 SetRetention = 0x05,
43 CreateTopicWithRetention = 0x06,
45
46 GetClusterStatus = 0x07,
49 Authenticate = 0x08,
51 AuthenticateResponse = 0x09,
53
54 Fetch = 0x10,
56 FetchResponse = 0x11,
57
58 Subscribe = 0x20,
61 Unsubscribe = 0x21,
63 CommitOffset = 0x22,
65 SubscribeAck = 0x23,
67 CommitAck = 0x24,
69
70 TopicResponse = 0x80,
72 ClusterStatusResponse = 0x81,
74 ErrorResponse = 0xFF,
75}
76
77impl From<u8> for ControlCommand {
78 fn from(value: u8) -> Self {
79 match value {
80 0x01 => Self::CreateTopic,
81 0x02 => Self::DeleteTopic,
82 0x03 => Self::ListTopics,
83 0x04 => Self::GetTopic,
84 0x05 => Self::SetRetention,
85 0x06 => Self::CreateTopicWithRetention,
86 0x07 => Self::GetClusterStatus,
87 0x08 => Self::Authenticate,
88 0x09 => Self::AuthenticateResponse,
89 0x10 => Self::Fetch,
90 0x11 => Self::FetchResponse,
91 0x20 => Self::Subscribe,
92 0x21 => Self::Unsubscribe,
93 0x22 => Self::CommitOffset,
94 0x23 => Self::SubscribeAck,
95 0x24 => Self::CommitAck,
96 0x80 => Self::TopicResponse,
97 0x81 => Self::ClusterStatusResponse,
98 0xFF => Self::ErrorResponse,
99 _ => Self::ErrorResponse,
100 }
101 }
102}
103
104#[derive(Debug, Clone, Copy)]
105pub struct LwpHeader {
106 pub magic: [u8; 4],
107 pub version: u8,
108 pub flags: u8,
109 pub reserved: [u8; 2],
110 pub header_crc: u32,
111 pub ingest_header: IngestHeader,
112}
113
114impl LwpHeader {
115 pub const SIZE: usize = 44;
116
117 pub fn new(flags: u8, ingest_header: IngestHeader) -> Self {
118 let mut header = Self {
119 magic: LANCE_MAGIC,
120 version: PROTOCOL_VERSION,
121 flags,
122 reserved: [0; 2],
123 header_crc: 0,
124 ingest_header,
125 };
126
127 let mut crc_buf = [0u8; 8];
128 crc_buf[0..4].copy_from_slice(&header.magic);
129 crc_buf[4] = header.version;
130 crc_buf[5] = header.flags;
131 crc_buf[6..8].copy_from_slice(&header.reserved);
132 header.header_crc = crc32c(&crc_buf);
133
134 header
135 }
136
137 pub fn parse(buf: &[u8]) -> Result<Self> {
138 if buf.len() < Self::SIZE {
139 return Err(LanceError::Protocol(
140 "Buffer too small for LWP header".into(),
141 ));
142 }
143
144 if buf[0..4] != LANCE_MAGIC {
145 return Err(LanceError::InvalidMagic);
146 }
147
148 let version = buf[4];
149 if version != PROTOCOL_VERSION {
150 return Err(LanceError::Protocol(format!(
151 "Unsupported protocol version: {}",
152 version
153 )));
154 }
155
156 let stored_crc = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
157 let computed_crc = crc32c(&buf[0..8]);
158
159 if stored_crc != computed_crc {
160 return Err(LanceError::CrcMismatch {
161 expected: stored_crc,
162 actual: computed_crc,
163 });
164 }
165
166 let mut magic = [0u8; 4];
167 magic.copy_from_slice(&buf[0..4]);
168
169 let ingest_header = IngestHeader::parse(&buf[12..Self::SIZE])?;
170
171 Ok(Self {
172 magic,
173 version: buf[4],
174 flags: buf[5],
175 reserved: [buf[6], buf[7]],
176 header_crc: stored_crc,
177 ingest_header,
178 })
179 }
180
181 pub fn encode(&self) -> [u8; Self::SIZE] {
182 let mut buf = [0u8; Self::SIZE];
183 buf[0..4].copy_from_slice(&self.magic);
184 buf[4] = self.version;
185 buf[5] = self.flags;
186 buf[6..8].copy_from_slice(&self.reserved);
187 buf[8..12].copy_from_slice(&self.header_crc.to_le_bytes());
188 self.ingest_header.encode_into(&mut buf[12..Self::SIZE]);
189 buf
190 }
191
192 #[inline]
193 pub fn has_flag(&self, flag: LwpFlags) -> bool {
194 self.flags & (flag as u8) != 0
195 }
196
197 #[inline]
198 pub fn is_keepalive(&self) -> bool {
199 self.has_flag(LwpFlags::Keepalive)
200 }
201
202 #[inline]
203 pub fn is_batch_mode(&self) -> bool {
204 self.has_flag(LwpFlags::BatchMode)
205 }
206
207 #[inline]
208 pub fn is_compressed(&self) -> bool {
209 self.has_flag(LwpFlags::Compressed)
210 }
211
212 #[inline]
213 pub fn is_control(&self) -> bool {
214 self.has_flag(LwpFlags::Control)
215 }
216
217 pub fn keepalive() -> Self {
219 Self::new(LwpFlags::Keepalive as u8, IngestHeader::default())
220 }
221}
222
223#[derive(Debug, Clone, Copy, Default)]
224pub struct IngestHeader {
225 pub batch_id: u64,
226 pub timestamp_ns: u64,
227 pub record_count: u32,
228 pub payload_length: u32,
229 pub payload_crc: u32,
230 pub topic_id: u32,
231}
232
233impl IngestHeader {
234 pub const SIZE: usize = 32;
235
236 pub fn new(
237 batch_id: u64,
238 timestamp_ns: u64,
239 record_count: u32,
240 payload_length: u32,
241 payload_crc: u32,
242 ) -> Self {
243 Self {
244 batch_id,
245 timestamp_ns,
246 record_count,
247 payload_length,
248 payload_crc,
249 topic_id: 0,
250 }
251 }
252
253 pub fn with_topic(
254 batch_id: u64,
255 timestamp_ns: u64,
256 record_count: u32,
257 payload_length: u32,
258 payload_crc: u32,
259 topic_id: u32,
260 ) -> Self {
261 Self {
262 batch_id,
263 timestamp_ns,
264 record_count,
265 payload_length,
266 payload_crc,
267 topic_id,
268 }
269 }
270
271 pub fn parse(buf: &[u8]) -> Result<Self> {
272 if buf.len() < Self::SIZE {
273 return Err(LanceError::Protocol(
274 "Buffer too small for IngestHeader".into(),
275 ));
276 }
277
278 Ok(Self {
279 batch_id: u64::from_le_bytes([
280 buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
281 ]),
282 timestamp_ns: u64::from_le_bytes([
283 buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
284 ]),
285 record_count: u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]),
286 payload_length: u32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]),
287 payload_crc: u32::from_le_bytes([buf[24], buf[25], buf[26], buf[27]]),
288 topic_id: u32::from_le_bytes([buf[28], buf[29], buf[30], buf[31]]),
289 })
290 }
291
292 pub fn encode_into(&self, buf: &mut [u8]) {
293 buf[0..8].copy_from_slice(&self.batch_id.to_le_bytes());
294 buf[8..16].copy_from_slice(&self.timestamp_ns.to_le_bytes());
295 buf[16..20].copy_from_slice(&self.record_count.to_le_bytes());
296 buf[20..24].copy_from_slice(&self.payload_length.to_le_bytes());
297 buf[24..28].copy_from_slice(&self.payload_crc.to_le_bytes());
298 buf[28..32].copy_from_slice(&self.topic_id.to_le_bytes());
299 }
300
301 pub fn validate_payload(&self, payload: &[u8]) -> Result<()> {
302 if payload.len() != self.payload_length as usize {
303 return Err(LanceError::Protocol(format!(
304 "Payload length mismatch: expected {}, got {}",
305 self.payload_length,
306 payload.len()
307 )));
308 }
309
310 let actual_crc = crc32c(payload);
311 if actual_crc != self.payload_crc {
312 lnc_metrics::increment_crc_failures();
313 return Err(LanceError::CrcMismatch {
314 expected: self.payload_crc,
315 actual: actual_crc,
316 });
317 }
318
319 Ok(())
320 }
321}
322
323#[cfg(test)]
324#[allow(clippy::unwrap_used)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_lwp_header_roundtrip() {
330 let ingest = IngestHeader::new(12345, 1_000_000_000, 100, 4096, 0xDEADBEEF);
331 let header = LwpHeader::new(LwpFlags::BatchMode as u8, ingest);
332
333 let encoded = header.encode();
334 let parsed = LwpHeader::parse(&encoded).unwrap();
335
336 assert_eq!(parsed.magic, LANCE_MAGIC);
337 assert_eq!(parsed.version, PROTOCOL_VERSION);
338 assert_eq!(parsed.ingest_header.batch_id, 12345);
339 assert_eq!(parsed.ingest_header.record_count, 100);
340 }
341
342 #[test]
343 fn test_invalid_magic() {
344 let mut buf = [0u8; LwpHeader::SIZE];
345 buf[0..4].copy_from_slice(b"JUNK");
346
347 let result = LwpHeader::parse(&buf);
348 assert!(matches!(result, Err(LanceError::InvalidMagic)));
349 }
350
351 #[test]
352 fn test_keepalive() {
353 let header = LwpHeader::keepalive();
354 assert!(header.is_keepalive());
355 assert!(!header.is_batch_mode());
356 }
357
358 #[test]
359 fn test_payload_validation() {
360 let payload = b"test payload data";
361 let crc = crc32c(payload);
362
363 let ingest = IngestHeader::new(1, 0, 1, payload.len() as u32, crc);
364 assert!(ingest.validate_payload(payload).is_ok());
365
366 let wrong_crc_ingest = IngestHeader::new(1, 0, 1, payload.len() as u32, 0xBADBAD);
367 assert!(wrong_crc_ingest.validate_payload(payload).is_err());
368 }
369
370 #[test]
375 fn test_control_command_topic_management() {
376 assert_eq!(ControlCommand::from(0x01), ControlCommand::CreateTopic);
377 assert_eq!(ControlCommand::from(0x02), ControlCommand::DeleteTopic);
378 assert_eq!(ControlCommand::from(0x03), ControlCommand::ListTopics);
379 assert_eq!(ControlCommand::from(0x04), ControlCommand::GetTopic);
380 }
381
382 #[test]
383 fn test_control_command_fetch() {
384 assert_eq!(ControlCommand::from(0x10), ControlCommand::Fetch);
385 assert_eq!(ControlCommand::from(0x11), ControlCommand::FetchResponse);
386 }
387
388 #[test]
389 fn test_control_command_streaming() {
390 assert_eq!(ControlCommand::from(0x20), ControlCommand::Subscribe);
391 assert_eq!(ControlCommand::from(0x21), ControlCommand::Unsubscribe);
392 assert_eq!(ControlCommand::from(0x22), ControlCommand::CommitOffset);
393 assert_eq!(ControlCommand::from(0x23), ControlCommand::SubscribeAck);
394 assert_eq!(ControlCommand::from(0x24), ControlCommand::CommitAck);
395 }
396
397 #[test]
398 fn test_control_command_responses() {
399 assert_eq!(ControlCommand::from(0x80), ControlCommand::TopicResponse);
400 assert_eq!(ControlCommand::from(0xFF), ControlCommand::ErrorResponse);
401 }
402
403 #[test]
404 fn test_control_command_authentication() {
405 assert_eq!(ControlCommand::from(0x08), ControlCommand::Authenticate);
406 assert_eq!(
407 ControlCommand::from(0x09),
408 ControlCommand::AuthenticateResponse
409 );
410 assert_eq!(ControlCommand::Authenticate as u8, 0x08);
411 assert_eq!(ControlCommand::AuthenticateResponse as u8, 0x09);
412 }
413
414 #[test]
415 fn test_control_command_unknown_defaults_to_error() {
416 assert_eq!(ControlCommand::from(0x00), ControlCommand::ErrorResponse);
418 assert_eq!(ControlCommand::from(0x99), ControlCommand::ErrorResponse);
419 assert_eq!(ControlCommand::from(0xFE), ControlCommand::ErrorResponse);
420 }
421
422 #[test]
423 fn test_control_command_roundtrip() {
424 assert_eq!(ControlCommand::Subscribe as u8, 0x20);
426 assert_eq!(ControlCommand::Unsubscribe as u8, 0x21);
427 assert_eq!(ControlCommand::CommitOffset as u8, 0x22);
428 assert_eq!(ControlCommand::SubscribeAck as u8, 0x23);
429 assert_eq!(ControlCommand::CommitAck as u8, 0x24);
430 }
431}