1#![allow(dead_code)]
2
3use std::ops::{Bound, Range};
38
39use bytes::{BufMut, Bytes, BytesMut};
40use common::BytesRange;
41use common::serde::key_prefix::{KeyPrefix, RecordTag};
42use common::serde::terminated_bytes;
43use common::serde::varint::var_u64;
44
45use crate::error::Error;
46use crate::model::SegmentId;
47use crate::segment::LogSegment;
48
49impl From<common::serde::DeserializeError> for Error {
50 fn from(err: common::serde::DeserializeError) -> Self {
51 Error::Encoding(err.message)
52 }
53}
54
55pub const KEY_VERSION: u8 = 0x01;
57
58pub const SEQ_BLOCK_KEY: [u8; 2] = [KEY_VERSION, 0x02]; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum RecordType {
68 LogEntry = 0x01,
70 SeqBlock = 0x02,
72 SegmentMeta = 0x03,
74 ListingEntry = 0x04,
76}
77
78impl RecordType {
79 pub fn id(&self) -> u8 {
81 *self as u8
82 }
83
84 pub fn from_id(id: u8) -> Result<Self, Error> {
86 match id {
87 0x01 => Ok(RecordType::LogEntry),
88 0x02 => Ok(RecordType::SeqBlock),
89 0x03 => Ok(RecordType::SegmentMeta),
90 0x04 => Ok(RecordType::ListingEntry),
91 _ => Err(Error::Encoding(format!(
92 "invalid record type: 0x{:02x}",
93 id
94 ))),
95 }
96 }
97
98 pub fn tag(&self) -> RecordTag {
102 RecordTag::new(self.id(), 0)
103 }
104
105 pub fn prefix(&self) -> KeyPrefix {
107 KeyPrefix::new(KEY_VERSION, self.tag())
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct LogEntryKey {
129 pub segment_id: SegmentId,
131 pub key: Bytes,
133 pub sequence: u64,
135}
136
137impl LogEntryKey {
138 pub fn new(segment_id: SegmentId, key: Bytes, sequence: u64) -> Self {
140 Self {
141 segment_id,
142 key,
143 sequence,
144 }
145 }
146
147 pub fn serialize(&self, segment_start_seq: u64) -> Bytes {
152 let relative_seq = self.sequence - segment_start_seq;
153 let mut buf = BytesMut::new();
154 RecordType::LogEntry.prefix().write_to(&mut buf);
155 buf.put_u32(self.segment_id);
156 terminated_bytes::serialize(&self.key, &mut buf);
157 var_u64::serialize(relative_seq, &mut buf);
158 buf.freeze()
159 }
160
161 pub fn deserialize(data: &[u8], segment_start_seq: u64) -> Result<Self, Error> {
167 let prefix = KeyPrefix::from_bytes_versioned(data, KEY_VERSION)?;
168 let record_type = RecordType::from_id(prefix.tag().record_type())?;
169 if record_type != RecordType::LogEntry {
170 return Err(Error::Encoding(format!(
171 "invalid record type: expected LogEntry, got {:?}",
172 record_type
173 )));
174 }
175
176 if data.len() < 6 {
177 return Err(Error::Encoding(
178 "buffer too short for log entry key".to_string(),
179 ));
180 }
181
182 let segment_id = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
183
184 let mut buf = &data[6..];
185 let key = terminated_bytes::deserialize(&mut buf)?;
186 let relative_seq = var_u64::deserialize(&mut buf)?;
187 let sequence = segment_start_seq + relative_seq;
188
189 Ok(LogEntryKey {
190 segment_id,
191 key,
192 sequence,
193 })
194 }
195
196 pub fn scan_range(segment: &LogSegment, key: &[u8], seq_range: Range<u64>) -> BytesRange {
202 let start_key = Self::build_scan_key(segment, key, seq_range.start);
203 let end_key = Self::build_scan_key(segment, key, seq_range.end);
204 BytesRange::new(Bound::Included(start_key), Bound::Excluded(end_key))
205 }
206
207 fn build_scan_key(segment: &LogSegment, key: &[u8], seq: u64) -> Bytes {
209 let relative_seq = seq.saturating_sub(segment.meta().start_seq);
210 let mut buf = BytesMut::new();
211 RecordType::LogEntry.prefix().write_to(&mut buf);
212 buf.put_u32(segment.id());
213 terminated_bytes::serialize(key, &mut buf);
214 var_u64::serialize(relative_seq, &mut buf);
215 buf.freeze()
216 }
217}
218
219#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct SegmentMetaKey {
226 pub segment_id: SegmentId,
228}
229
230impl SegmentMetaKey {
231 pub fn new(segment_id: SegmentId) -> Self {
233 Self { segment_id }
234 }
235
236 pub fn serialize(&self) -> Bytes {
238 let mut buf = BytesMut::with_capacity(6);
239 RecordType::SegmentMeta.prefix().write_to(&mut buf);
240 buf.put_u32(self.segment_id);
241 buf.freeze()
242 }
243
244 pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
246 let prefix = KeyPrefix::from_bytes_versioned(data, KEY_VERSION)?;
247 let record_type = RecordType::from_id(prefix.tag().record_type())?;
248 if record_type != RecordType::SegmentMeta {
249 return Err(Error::Encoding(format!(
250 "invalid record type: expected SegmentMeta, got {:?}",
251 record_type
252 )));
253 }
254
255 if data.len() < 6 {
256 return Err(Error::Encoding(
257 "buffer too short for SegmentMeta key".to_string(),
258 ));
259 }
260
261 let segment_id = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
262
263 Ok(SegmentMetaKey { segment_id })
264 }
265
266 pub fn scan_range(range: Range<SegmentId>) -> BytesRange {
268 let start = Bound::Included(SegmentMetaKey::new(range.start).serialize());
269 let end = Bound::Excluded(SegmentMetaKey::new(range.end).serialize());
270 BytesRange::new(start, end)
271 }
272}
273
274#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct SegmentMeta {
281 pub start_seq: u64,
283 pub start_time_ms: i64,
285}
286
287impl SegmentMeta {
288 pub fn new(start_seq: u64, start_time_ms: i64) -> Self {
290 Self {
291 start_seq,
292 start_time_ms,
293 }
294 }
295
296 pub fn serialize(&self) -> Bytes {
298 let mut buf = BytesMut::with_capacity(16);
299 buf.put_u64(self.start_seq);
300 buf.put_i64(self.start_time_ms);
301 buf.freeze()
302 }
303
304 pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
306 if data.len() < 16 {
307 return Err(Error::Encoding(format!(
308 "buffer too short for SegmentMeta value: need 16 bytes, got {}",
309 data.len()
310 )));
311 }
312
313 let start_seq = u64::from_be_bytes([
314 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
315 ]);
316 let start_time_ms = i64::from_be_bytes([
317 data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
318 ]);
319
320 Ok(SegmentMeta {
321 start_seq,
322 start_time_ms,
323 })
324 }
325}
326
327#[derive(Debug, Clone, PartialEq, Eq)]
340pub struct ListingEntryKey {
341 pub segment_id: SegmentId,
343 pub key: Bytes,
345}
346
347impl ListingEntryKey {
348 pub fn new(segment_id: SegmentId, key: Bytes) -> Self {
350 Self { segment_id, key }
351 }
352
353 pub fn serialize(&self) -> Bytes {
355 let mut buf = BytesMut::new();
356 RecordType::ListingEntry.prefix().write_to(&mut buf);
357 buf.put_u32(self.segment_id);
358 buf.put_slice(&self.key);
359 buf.freeze()
360 }
361
362 pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
364 let prefix = KeyPrefix::from_bytes_versioned(data, KEY_VERSION)?;
365 let record_type = RecordType::from_id(prefix.tag().record_type())?;
366 if record_type != RecordType::ListingEntry {
367 return Err(Error::Encoding(format!(
368 "invalid record type: expected ListingEntry, got {:?}",
369 record_type
370 )));
371 }
372
373 if data.len() < 6 {
374 return Err(Error::Encoding(
375 "buffer too short for listing entry key".to_string(),
376 ));
377 }
378
379 let segment_id = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
380 let key = Bytes::copy_from_slice(&data[6..]);
381
382 Ok(ListingEntryKey { segment_id, key })
383 }
384
385 pub fn scan_range(range: Range<SegmentId>) -> BytesRange {
390 let start = Bound::Included(Self::segment_prefix(range.start));
391 let end = Bound::Excluded(Self::segment_prefix(range.end));
392 BytesRange::new(start, end)
393 }
394
395 fn segment_prefix(segment_id: SegmentId) -> Bytes {
397 let mut buf = BytesMut::with_capacity(6);
398 RecordType::ListingEntry.prefix().write_to(&mut buf);
399 buf.put_u32(segment_id);
400 buf.freeze()
401 }
402}
403
404#[derive(Debug, Clone, Default, PartialEq, Eq)]
409pub struct ListingEntryValue;
410
411impl ListingEntryValue {
412 pub fn new() -> Self {
414 Self
415 }
416
417 pub fn serialize(&self) -> Bytes {
419 Bytes::new()
420 }
421
422 pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
424 if !data.is_empty() {
425 return Err(Error::Encoding(format!(
426 "listing entry value should be empty, got {} bytes",
427 data.len()
428 )));
429 }
430 Ok(ListingEntryValue)
431 }
432}
433
434pub(crate) struct LogEntryBuilder;
451
452impl LogEntryBuilder {
453 pub(crate) fn build(
461 segment: &crate::segment::LogSegment,
462 base_sequence: u64,
463 user_records: &[crate::model::Record],
464 records: &mut Vec<common::Record>,
465 ) {
466 let segment_start_seq = segment.meta().start_seq;
467
468 for (i, user_record) in user_records.iter().enumerate() {
469 let sequence = base_sequence + i as u64;
470 let entry_key = LogEntryKey::new(segment.id(), user_record.key.clone(), sequence);
471 let storage_record = common::Record::new(
472 entry_key.serialize(segment_start_seq),
473 user_record.value.clone(),
474 );
475 records.push(storage_record);
476 }
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use std::ops::RangeBounds;
484
485 #[test]
486 fn should_convert_record_type_to_id_and_back() {
487 let log_entry = RecordType::LogEntry;
489 let seq_block = RecordType::SeqBlock;
490 let segment_meta = RecordType::SegmentMeta;
491 let listing_entry = RecordType::ListingEntry;
492
493 assert_eq!(log_entry.id(), 0x01);
495 assert_eq!(seq_block.id(), 0x02);
496 assert_eq!(segment_meta.id(), 0x03);
497 assert_eq!(listing_entry.id(), 0x04);
498 assert_eq!(RecordType::from_id(0x01).unwrap(), RecordType::LogEntry);
499 assert_eq!(RecordType::from_id(0x02).unwrap(), RecordType::SeqBlock);
500 assert_eq!(RecordType::from_id(0x03).unwrap(), RecordType::SegmentMeta);
501 assert_eq!(RecordType::from_id(0x04).unwrap(), RecordType::ListingEntry);
502 }
503
504 #[test]
505 fn should_reject_invalid_record_type() {
506 let invalid_byte = 0x99;
508
509 let result = RecordType::from_id(invalid_byte);
511
512 assert!(result.is_err());
514 }
515
516 #[test]
517 fn should_serialize_and_deserialize_log_entry_key() {
518 let segment_start_seq = 10000;
520 let key = LogEntryKey::new(42, Bytes::from("test_key"), 12345);
521
522 let serialized = key.serialize(segment_start_seq);
524 let deserialized = LogEntryKey::deserialize(&serialized, segment_start_seq).unwrap();
525
526 assert_eq!(deserialized.segment_id, 42);
528 assert_eq!(deserialized.key, Bytes::from("test_key"));
529 assert_eq!(deserialized.sequence, 12345);
530 }
531
532 #[test]
533 fn should_serialize_log_entry_key_with_correct_structure() {
534 let segment_start_seq = 0;
536 let key = LogEntryKey::new(1, Bytes::from("k"), 100);
537
538 let serialized = key.serialize(segment_start_seq);
540
541 assert_eq!(serialized.len(), 10);
544 assert_eq!(serialized[0], KEY_VERSION);
545 assert_eq!(serialized[1], RecordType::LogEntry.tag().as_byte());
547 assert_eq!(serialized[1], 0x10);
548 assert_eq!(&serialized[2..6], &[0, 0, 0, 1]);
550 assert_eq!(serialized[6], b'k');
552 assert_eq!(serialized[7], 0x00); assert_eq!(&serialized[8..10], &[0x10, 0x64]);
557 }
558
559 #[test]
560 fn should_serialize_relative_sequence() {
561 let segment_start_seq = 1000;
563 let key = LogEntryKey::new(1, Bytes::from("k"), 1005); let serialized = key.serialize(segment_start_seq);
567
568 assert_eq!(serialized.len(), 9);
572 assert_eq!(serialized[8], 0x05);
574 }
575
576 #[test]
577 fn should_order_log_entries_by_segment_then_key_then_sequence() {
578 let segment_start_seq = 0;
580 let key1 = LogEntryKey::new(0, Bytes::from("a"), 1);
581 let key2 = LogEntryKey::new(0, Bytes::from("a"), 2);
582 let key3 = LogEntryKey::new(0, Bytes::from("b"), 1);
583 let segment1_start_seq = 100;
585 let key4 = LogEntryKey::new(1, Bytes::from("a"), 101);
586
587 let s1 = key1.serialize(segment_start_seq);
589 let s2 = key2.serialize(segment_start_seq);
590 let s3 = key3.serialize(segment_start_seq);
591 let s4 = key4.serialize(segment1_start_seq);
592
593 assert!(s1 < s2, "same segment/key, seq 1 < seq 2");
595 assert!(s2 < s3, "same segment, key 'a' < key 'b'");
596 assert!(s3 < s4, "segment 0 < segment 1");
597 }
598
599 #[test]
600 fn should_create_record_tag() {
601 let log_entry_tag = RecordType::LogEntry.tag();
603 let seq_block_tag = RecordType::SeqBlock.tag();
604 let segment_meta_tag = RecordType::SegmentMeta.tag();
605 let listing_entry_tag = RecordType::ListingEntry.tag();
606
607 assert_eq!(log_entry_tag.as_byte(), 0x10);
609 assert_eq!(seq_block_tag.as_byte(), 0x20);
610 assert_eq!(segment_meta_tag.as_byte(), 0x30);
611 assert_eq!(listing_entry_tag.as_byte(), 0x40);
612 }
613
614 #[test]
615 fn should_fail_deserialize_log_entry_key_too_short() {
616 let data = vec![KEY_VERSION, RecordType::LogEntry.tag().as_byte(), 0, 0, 0]; let result = LogEntryKey::deserialize(&data, 0);
621
622 assert!(result.is_err());
624 }
625
626 #[test]
627 fn should_serialize_and_deserialize_listing_entry_key() {
628 let key = ListingEntryKey::new(42, Bytes::from("test_key"));
630
631 let serialized = key.serialize();
633 let deserialized = ListingEntryKey::deserialize(&serialized).unwrap();
634
635 assert_eq!(deserialized.segment_id, 42);
637 assert_eq!(deserialized.key, Bytes::from("test_key"));
638 }
639
640 #[test]
641 fn should_serialize_listing_entry_key_with_correct_structure() {
642 let key = ListingEntryKey::new(1, Bytes::from("k"));
644
645 let serialized = key.serialize();
647
648 assert_eq!(serialized.len(), 7);
651 assert_eq!(serialized[0], KEY_VERSION);
652 assert_eq!(serialized[1], RecordType::ListingEntry.tag().as_byte());
654 assert_eq!(serialized[1], 0x40);
655 assert_eq!(&serialized[2..6], &[0, 0, 0, 1]);
657 assert_eq!(serialized[6], b'k');
659 }
660
661 #[test]
662 fn should_serialize_listing_entry_key_with_empty_key() {
663 let key = ListingEntryKey::new(1, Bytes::new());
665
666 let serialized = key.serialize();
668 let deserialized = ListingEntryKey::deserialize(&serialized).unwrap();
669
670 assert_eq!(serialized.len(), 6); assert_eq!(deserialized.segment_id, 1);
673 assert_eq!(deserialized.key, Bytes::new());
674 }
675
676 #[test]
677 fn should_order_listing_entries_by_segment_then_key() {
678 let key1 = ListingEntryKey::new(0, Bytes::from("a"));
680 let key2 = ListingEntryKey::new(0, Bytes::from("b"));
681 let key3 = ListingEntryKey::new(1, Bytes::from("a"));
682
683 let s1 = key1.serialize();
685 let s2 = key2.serialize();
686 let s3 = key3.serialize();
687
688 assert!(s1 < s2, "same segment, key 'a' < key 'b'");
690 assert!(s2 < s3, "segment 0 < segment 1");
691 }
692
693 #[test]
694 fn should_create_listing_entry_scan_range() {
695 let range = 1..3;
697
698 let scan_range = ListingEntryKey::scan_range(range);
700
701 let start_key = ListingEntryKey::new(1, Bytes::new()).serialize();
703 let end_key = ListingEntryKey::new(3, Bytes::new()).serialize();
704
705 assert_eq!(scan_range.start_bound(), Bound::Included(&start_key));
707 assert_eq!(scan_range.end_bound(), Bound::Excluded(&end_key));
708 }
709
710 #[test]
711 fn should_serialize_and_deserialize_listing_entry_value() {
712 let value = ListingEntryValue::new();
714
715 let serialized = value.serialize();
717 let deserialized = ListingEntryValue::deserialize(&serialized).unwrap();
718
719 assert!(serialized.is_empty());
721 assert_eq!(deserialized, ListingEntryValue);
722 }
723
724 #[test]
725 fn should_fail_deserialize_listing_entry_value_with_data() {
726 let data = vec![0x01, 0x02];
728
729 let result = ListingEntryValue::deserialize(&data);
731
732 assert!(result.is_err());
734 }
735
736 #[test]
737 fn should_fail_deserialize_listing_entry_key_too_short() {
738 let data = vec![
740 KEY_VERSION,
741 RecordType::ListingEntry.tag().as_byte(),
742 0,
743 0,
744 0,
745 ]; let result = ListingEntryKey::deserialize(&data);
749
750 assert!(result.is_err());
752 }
753
754 mod proptests {
755 use proptest::prelude::*;
756
757 use super::*;
758
759 proptest! {
760 #[test]
761 fn should_preserve_sequence_ordering(a: u64, b: u64) {
762 let segment_start_seq = 0;
763 let key_a = LogEntryKey::new(0, Bytes::from("key"), a);
764 let key_b = LogEntryKey::new(0, Bytes::from("key"), b);
765
766 let enc_a = key_a.serialize(segment_start_seq);
767 let enc_b = key_b.serialize(segment_start_seq);
768
769 prop_assert_eq!(
770 a.cmp(&b),
771 enc_a.cmp(&enc_b),
772 "ordering mismatch: a={}, b={}, enc_a={:?}, enc_b={:?}",
773 a, b, enc_a.as_ref(), enc_b.as_ref()
774 );
775 }
776
777 #[test]
778 fn should_include_listing_entry_in_scan_range(
779 start in 0u32..1000,
780 range_size in 1u32..100,
781 offset in 0u32..100,
782 key_bytes in prop::collection::vec(any::<u8>(), 1..100),
783 ) {
784 let end = start.saturating_add(range_size);
785 let segment_id = start.saturating_add(offset % range_size);
786
787 let key = ListingEntryKey::new(segment_id, Bytes::from(key_bytes));
788 let serialized = key.serialize();
789
790 let scan_range = ListingEntryKey::scan_range(start..end);
791
792 prop_assert!(
793 scan_range.contains(&serialized),
794 "listing entry for segment {} with key should be in range {}..{}, \
795 serialized={:?}, range_start={:?}, range_end={:?}",
796 segment_id, start, end,
797 serialized.as_ref(),
798 scan_range.start_bound(),
799 scan_range.end_bound()
800 );
801 }
802 }
803 }
804}