1#![allow(dead_code)]
2
3use std::ops::{Bound, Range};
38
39use bytes::{BufMut, Bytes, BytesMut};
40use common::BytesRange;
41use common::serde::key_prefix::{KeyPrefix, RecordTag};
42use common::serde::terminated_bytes;
43use common::serde::varint::var_u64;
44
45use crate::error::Error;
46use crate::model::SegmentId;
47use crate::segment::LogSegment;
48
49impl From<common::serde::DeserializeError> for Error {
50 fn from(err: common::serde::DeserializeError) -> Self {
51 Error::Encoding(err.message)
52 }
53}
54
55pub const KEY_VERSION: u8 = 0x01;
57
58pub const SEQ_BLOCK_KEY: [u8; 2] = [KEY_VERSION, 0x02]; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum RecordType {
68 LogEntry = 0x01,
70 SeqBlock = 0x02,
72 SegmentMeta = 0x03,
74 ListingEntry = 0x04,
76}
77
78impl RecordType {
79 pub fn id(&self) -> u8 {
81 *self as u8
82 }
83
84 pub fn from_id(id: u8) -> Result<Self, Error> {
86 match id {
87 0x01 => Ok(RecordType::LogEntry),
88 0x02 => Ok(RecordType::SeqBlock),
89 0x03 => Ok(RecordType::SegmentMeta),
90 0x04 => Ok(RecordType::ListingEntry),
91 _ => Err(Error::Encoding(format!(
92 "invalid record type: 0x{:02x}",
93 id
94 ))),
95 }
96 }
97
98 pub fn tag(&self) -> RecordTag {
102 RecordTag::new(self.id(), 0)
103 }
104
105 pub fn prefix(&self) -> KeyPrefix {
107 KeyPrefix::new(KEY_VERSION, self.tag())
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct LogEntryKey {
129 pub segment_id: SegmentId,
131 pub key: Bytes,
133 pub sequence: u64,
135}
136
137impl LogEntryKey {
138 pub fn new(segment_id: SegmentId, key: Bytes, sequence: u64) -> Self {
140 Self {
141 segment_id,
142 key,
143 sequence,
144 }
145 }
146
147 pub fn serialize(&self, segment_start_seq: u64) -> Bytes {
152 let relative_seq = self.sequence - segment_start_seq;
153 let mut buf = BytesMut::new();
154 RecordType::LogEntry.prefix().write_to(&mut buf);
155 buf.put_u32(self.segment_id);
156 terminated_bytes::serialize(&self.key, &mut buf);
157 var_u64::serialize(relative_seq, &mut buf);
158 buf.freeze()
159 }
160
161 pub fn deserialize(data: &[u8], segment_start_seq: u64) -> Result<Self, Error> {
167 let prefix = KeyPrefix::from_bytes_versioned(data, KEY_VERSION)?;
168 let record_type = RecordType::from_id(prefix.tag().record_type())?;
169 if record_type != RecordType::LogEntry {
170 return Err(Error::Encoding(format!(
171 "invalid record type: expected LogEntry, got {:?}",
172 record_type
173 )));
174 }
175
176 if data.len() < 6 {
177 return Err(Error::Encoding(
178 "buffer too short for log entry key".to_string(),
179 ));
180 }
181
182 let segment_id = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
183
184 let mut buf = &data[6..];
185 let key = terminated_bytes::deserialize(&mut buf)?;
186 let relative_seq = var_u64::deserialize(&mut buf)?;
187 let sequence = segment_start_seq + relative_seq;
188
189 Ok(LogEntryKey {
190 segment_id,
191 key,
192 sequence,
193 })
194 }
195
196 pub fn scan_range(segment: &LogSegment, key: &[u8], seq_range: Range<u64>) -> BytesRange {
202 let start_key = Self::build_scan_key(segment, key, seq_range.start);
203 let end_key = Self::build_scan_key(segment, key, seq_range.end);
204 BytesRange::new(Bound::Included(start_key), Bound::Excluded(end_key))
205 }
206
207 fn build_scan_key(segment: &LogSegment, key: &[u8], seq: u64) -> Bytes {
209 let relative_seq = seq.saturating_sub(segment.meta().start_seq);
210 let mut buf = BytesMut::new();
211 RecordType::LogEntry.prefix().write_to(&mut buf);
212 buf.put_u32(segment.id());
213 terminated_bytes::serialize(key, &mut buf);
214 var_u64::serialize(relative_seq, &mut buf);
215 buf.freeze()
216 }
217}
218
219#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct SegmentMetaKey {
226 pub segment_id: SegmentId,
228}
229
230impl SegmentMetaKey {
231 pub fn new(segment_id: SegmentId) -> Self {
233 Self { segment_id }
234 }
235
236 pub fn serialize(&self) -> Bytes {
238 let mut buf = BytesMut::with_capacity(6);
239 RecordType::SegmentMeta.prefix().write_to(&mut buf);
240 buf.put_u32(self.segment_id);
241 buf.freeze()
242 }
243
244 pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
246 let prefix = KeyPrefix::from_bytes_versioned(data, KEY_VERSION)?;
247 let record_type = RecordType::from_id(prefix.tag().record_type())?;
248 if record_type != RecordType::SegmentMeta {
249 return Err(Error::Encoding(format!(
250 "invalid record type: expected SegmentMeta, got {:?}",
251 record_type
252 )));
253 }
254
255 if data.len() < 6 {
256 return Err(Error::Encoding(
257 "buffer too short for SegmentMeta key".to_string(),
258 ));
259 }
260
261 let segment_id = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
262
263 Ok(SegmentMetaKey { segment_id })
264 }
265
266 pub fn scan_range(range: Range<SegmentId>) -> BytesRange {
268 let start = Bound::Included(SegmentMetaKey::new(range.start).serialize());
269 let end = Bound::Excluded(SegmentMetaKey::new(range.end).serialize());
270 BytesRange::new(start, end)
271 }
272}
273
274#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct SegmentMeta {
281 pub start_seq: u64,
283 pub start_time_ms: i64,
285}
286
287impl SegmentMeta {
288 pub fn new(start_seq: u64, start_time_ms: i64) -> Self {
290 Self {
291 start_seq,
292 start_time_ms,
293 }
294 }
295
296 pub fn serialize(&self) -> Bytes {
298 let mut buf = BytesMut::with_capacity(16);
299 buf.put_u64(self.start_seq);
300 buf.put_i64(self.start_time_ms);
301 buf.freeze()
302 }
303
304 pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
306 if data.len() < 16 {
307 return Err(Error::Encoding(format!(
308 "buffer too short for SegmentMeta value: need 16 bytes, got {}",
309 data.len()
310 )));
311 }
312
313 let start_seq = u64::from_be_bytes([
314 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
315 ]);
316 let start_time_ms = i64::from_be_bytes([
317 data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
318 ]);
319
320 Ok(SegmentMeta {
321 start_seq,
322 start_time_ms,
323 })
324 }
325}
326
327#[derive(Debug, Clone, PartialEq, Eq)]
340pub struct ListingEntryKey {
341 pub segment_id: SegmentId,
343 pub key: Bytes,
345}
346
347impl ListingEntryKey {
348 pub fn new(segment_id: SegmentId, key: Bytes) -> Self {
350 Self { segment_id, key }
351 }
352
353 pub fn serialize(&self) -> Bytes {
355 let mut buf = BytesMut::new();
356 RecordType::ListingEntry.prefix().write_to(&mut buf);
357 buf.put_u32(self.segment_id);
358 buf.put_slice(&self.key);
359 buf.freeze()
360 }
361
362 pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
364 let prefix = KeyPrefix::from_bytes_versioned(data, KEY_VERSION)?;
365 let record_type = RecordType::from_id(prefix.tag().record_type())?;
366 if record_type != RecordType::ListingEntry {
367 return Err(Error::Encoding(format!(
368 "invalid record type: expected ListingEntry, got {:?}",
369 record_type
370 )));
371 }
372
373 if data.len() < 6 {
374 return Err(Error::Encoding(
375 "buffer too short for listing entry key".to_string(),
376 ));
377 }
378
379 let segment_id = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
380 let key = Bytes::copy_from_slice(&data[6..]);
381
382 Ok(ListingEntryKey { segment_id, key })
383 }
384
385 pub fn scan_range(range: Range<SegmentId>) -> BytesRange {
390 let start = Bound::Included(Self::segment_prefix(range.start));
391 let end = Bound::Excluded(Self::segment_prefix(range.end));
392 BytesRange::new(start, end)
393 }
394
395 fn segment_prefix(segment_id: SegmentId) -> Bytes {
397 let mut buf = BytesMut::with_capacity(6);
398 RecordType::ListingEntry.prefix().write_to(&mut buf);
399 buf.put_u32(segment_id);
400 buf.freeze()
401 }
402}
403
404#[derive(Debug, Clone, Default, PartialEq, Eq)]
409pub struct ListingEntryValue;
410
411impl ListingEntryValue {
412 pub fn new() -> Self {
414 Self
415 }
416
417 pub fn serialize(&self) -> Bytes {
419 Bytes::new()
420 }
421
422 pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
424 if !data.is_empty() {
425 return Err(Error::Encoding(format!(
426 "listing entry value should be empty, got {} bytes",
427 data.len()
428 )));
429 }
430 Ok(ListingEntryValue)
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437 use std::ops::RangeBounds;
438
439 #[test]
440 fn should_convert_record_type_to_id_and_back() {
441 let log_entry = RecordType::LogEntry;
443 let seq_block = RecordType::SeqBlock;
444 let segment_meta = RecordType::SegmentMeta;
445 let listing_entry = RecordType::ListingEntry;
446
447 assert_eq!(log_entry.id(), 0x01);
449 assert_eq!(seq_block.id(), 0x02);
450 assert_eq!(segment_meta.id(), 0x03);
451 assert_eq!(listing_entry.id(), 0x04);
452 assert_eq!(RecordType::from_id(0x01).unwrap(), RecordType::LogEntry);
453 assert_eq!(RecordType::from_id(0x02).unwrap(), RecordType::SeqBlock);
454 assert_eq!(RecordType::from_id(0x03).unwrap(), RecordType::SegmentMeta);
455 assert_eq!(RecordType::from_id(0x04).unwrap(), RecordType::ListingEntry);
456 }
457
458 #[test]
459 fn should_reject_invalid_record_type() {
460 let invalid_byte = 0x99;
462
463 let result = RecordType::from_id(invalid_byte);
465
466 assert!(result.is_err());
468 }
469
470 #[test]
471 fn should_serialize_and_deserialize_log_entry_key() {
472 let segment_start_seq = 10000;
474 let key = LogEntryKey::new(42, Bytes::from("test_key"), 12345);
475
476 let serialized = key.serialize(segment_start_seq);
478 let deserialized = LogEntryKey::deserialize(&serialized, segment_start_seq).unwrap();
479
480 assert_eq!(deserialized.segment_id, 42);
482 assert_eq!(deserialized.key, Bytes::from("test_key"));
483 assert_eq!(deserialized.sequence, 12345);
484 }
485
486 #[test]
487 fn should_serialize_log_entry_key_with_correct_structure() {
488 let segment_start_seq = 0;
490 let key = LogEntryKey::new(1, Bytes::from("k"), 100);
491
492 let serialized = key.serialize(segment_start_seq);
494
495 assert_eq!(serialized.len(), 10);
498 assert_eq!(serialized[0], KEY_VERSION);
499 assert_eq!(serialized[1], RecordType::LogEntry.tag().as_byte());
501 assert_eq!(serialized[1], 0x10);
502 assert_eq!(&serialized[2..6], &[0, 0, 0, 1]);
504 assert_eq!(serialized[6], b'k');
506 assert_eq!(serialized[7], 0x00); assert_eq!(&serialized[8..10], &[0x10, 0x64]);
511 }
512
513 #[test]
514 fn should_serialize_relative_sequence() {
515 let segment_start_seq = 1000;
517 let key = LogEntryKey::new(1, Bytes::from("k"), 1005); let serialized = key.serialize(segment_start_seq);
521
522 assert_eq!(serialized.len(), 9);
526 assert_eq!(serialized[8], 0x05);
528 }
529
530 #[test]
531 fn should_order_log_entries_by_segment_then_key_then_sequence() {
532 let segment_start_seq = 0;
534 let key1 = LogEntryKey::new(0, Bytes::from("a"), 1);
535 let key2 = LogEntryKey::new(0, Bytes::from("a"), 2);
536 let key3 = LogEntryKey::new(0, Bytes::from("b"), 1);
537 let segment1_start_seq = 100;
539 let key4 = LogEntryKey::new(1, Bytes::from("a"), 101);
540
541 let s1 = key1.serialize(segment_start_seq);
543 let s2 = key2.serialize(segment_start_seq);
544 let s3 = key3.serialize(segment_start_seq);
545 let s4 = key4.serialize(segment1_start_seq);
546
547 assert!(s1 < s2, "same segment/key, seq 1 < seq 2");
549 assert!(s2 < s3, "same segment, key 'a' < key 'b'");
550 assert!(s3 < s4, "segment 0 < segment 1");
551 }
552
553 #[test]
554 fn should_create_record_tag() {
555 let log_entry_tag = RecordType::LogEntry.tag();
557 let seq_block_tag = RecordType::SeqBlock.tag();
558 let segment_meta_tag = RecordType::SegmentMeta.tag();
559 let listing_entry_tag = RecordType::ListingEntry.tag();
560
561 assert_eq!(log_entry_tag.as_byte(), 0x10);
563 assert_eq!(seq_block_tag.as_byte(), 0x20);
564 assert_eq!(segment_meta_tag.as_byte(), 0x30);
565 assert_eq!(listing_entry_tag.as_byte(), 0x40);
566 }
567
568 #[test]
569 fn should_fail_deserialize_log_entry_key_too_short() {
570 let data = vec![KEY_VERSION, RecordType::LogEntry.tag().as_byte(), 0, 0, 0]; let result = LogEntryKey::deserialize(&data, 0);
575
576 assert!(result.is_err());
578 }
579
580 #[test]
581 fn should_serialize_and_deserialize_listing_entry_key() {
582 let key = ListingEntryKey::new(42, Bytes::from("test_key"));
584
585 let serialized = key.serialize();
587 let deserialized = ListingEntryKey::deserialize(&serialized).unwrap();
588
589 assert_eq!(deserialized.segment_id, 42);
591 assert_eq!(deserialized.key, Bytes::from("test_key"));
592 }
593
594 #[test]
595 fn should_serialize_listing_entry_key_with_correct_structure() {
596 let key = ListingEntryKey::new(1, Bytes::from("k"));
598
599 let serialized = key.serialize();
601
602 assert_eq!(serialized.len(), 7);
605 assert_eq!(serialized[0], KEY_VERSION);
606 assert_eq!(serialized[1], RecordType::ListingEntry.tag().as_byte());
608 assert_eq!(serialized[1], 0x40);
609 assert_eq!(&serialized[2..6], &[0, 0, 0, 1]);
611 assert_eq!(serialized[6], b'k');
613 }
614
615 #[test]
616 fn should_serialize_listing_entry_key_with_empty_key() {
617 let key = ListingEntryKey::new(1, Bytes::new());
619
620 let serialized = key.serialize();
622 let deserialized = ListingEntryKey::deserialize(&serialized).unwrap();
623
624 assert_eq!(serialized.len(), 6); assert_eq!(deserialized.segment_id, 1);
627 assert_eq!(deserialized.key, Bytes::new());
628 }
629
630 #[test]
631 fn should_order_listing_entries_by_segment_then_key() {
632 let key1 = ListingEntryKey::new(0, Bytes::from("a"));
634 let key2 = ListingEntryKey::new(0, Bytes::from("b"));
635 let key3 = ListingEntryKey::new(1, Bytes::from("a"));
636
637 let s1 = key1.serialize();
639 let s2 = key2.serialize();
640 let s3 = key3.serialize();
641
642 assert!(s1 < s2, "same segment, key 'a' < key 'b'");
644 assert!(s2 < s3, "segment 0 < segment 1");
645 }
646
647 #[test]
648 fn should_create_listing_entry_scan_range() {
649 let range = 1..3;
651
652 let scan_range = ListingEntryKey::scan_range(range);
654
655 let start_key = ListingEntryKey::new(1, Bytes::new()).serialize();
657 let end_key = ListingEntryKey::new(3, Bytes::new()).serialize();
658
659 assert_eq!(scan_range.start_bound(), Bound::Included(&start_key));
661 assert_eq!(scan_range.end_bound(), Bound::Excluded(&end_key));
662 }
663
664 #[test]
665 fn should_serialize_and_deserialize_listing_entry_value() {
666 let value = ListingEntryValue::new();
668
669 let serialized = value.serialize();
671 let deserialized = ListingEntryValue::deserialize(&serialized).unwrap();
672
673 assert!(serialized.is_empty());
675 assert_eq!(deserialized, ListingEntryValue);
676 }
677
678 #[test]
679 fn should_fail_deserialize_listing_entry_value_with_data() {
680 let data = vec![0x01, 0x02];
682
683 let result = ListingEntryValue::deserialize(&data);
685
686 assert!(result.is_err());
688 }
689
690 #[test]
691 fn should_fail_deserialize_listing_entry_key_too_short() {
692 let data = vec![
694 KEY_VERSION,
695 RecordType::ListingEntry.tag().as_byte(),
696 0,
697 0,
698 0,
699 ]; let result = ListingEntryKey::deserialize(&data);
703
704 assert!(result.is_err());
706 }
707
708 mod proptests {
709 use proptest::prelude::*;
710
711 use super::*;
712
713 proptest! {
714 #[test]
715 fn should_preserve_sequence_ordering(a: u64, b: u64) {
716 let segment_start_seq = 0;
717 let key_a = LogEntryKey::new(0, Bytes::from("key"), a);
718 let key_b = LogEntryKey::new(0, Bytes::from("key"), b);
719
720 let enc_a = key_a.serialize(segment_start_seq);
721 let enc_b = key_b.serialize(segment_start_seq);
722
723 prop_assert_eq!(
724 a.cmp(&b),
725 enc_a.cmp(&enc_b),
726 "ordering mismatch: a={}, b={}, enc_a={:?}, enc_b={:?}",
727 a, b, enc_a.as_ref(), enc_b.as_ref()
728 );
729 }
730
731 #[test]
732 fn should_include_listing_entry_in_scan_range(
733 start in 0u32..1000,
734 range_size in 1u32..100,
735 offset in 0u32..100,
736 key_bytes in prop::collection::vec(any::<u8>(), 1..100),
737 ) {
738 let end = start.saturating_add(range_size);
739 let segment_id = start.saturating_add(offset % range_size);
740
741 let key = ListingEntryKey::new(segment_id, Bytes::from(key_bytes));
742 let serialized = key.serialize();
743
744 let scan_range = ListingEntryKey::scan_range(start..end);
745
746 prop_assert!(
747 scan_range.contains(&serialized),
748 "listing entry for segment {} with key should be in range {}..{}, \
749 serialized={:?}, range_start={:?}, range_end={:?}",
750 segment_id, start, end,
751 serialized.as_ref(),
752 scan_range.start_bound(),
753 scan_range.end_bound()
754 );
755 }
756 }
757 }
758}