Skip to main content

exoware_sdk_rs/
keys.rs

1use bytes::{Bytes, BytesMut};
2
3/// Maximum physical key length in bytes.
4///
5/// The storage stack is intentionally non-backwards-compatible with prior
6/// fixed-width key formats. All keys must now fit in a single `u8` length
7/// field, so the largest valid key is 254 bytes.
8pub const MAX_KEY_LEN: usize = 254;
9
10/// Maximum key scratch width retained for stack buffers and legacy callers.
11///
12/// This is no longer a fixed physical key width. It is simply the largest
13/// valid key length in bytes.
14pub const KEY_SIZE: usize = MAX_KEY_LEN;
15
16/// Minimum physical key length in bytes.
17pub const MIN_KEY_LEN: usize = 0;
18
19/// A store key. Variable-length, lexicographically ordered raw bytes.
20///
21/// Stored as [`Bytes`] so keys can share backing storage (e.g. RPC wire buffers via
22/// [`bytes::Bytes::slice_ref`]) without copying.
23pub type Key = Bytes;
24
25/// Mutable buffer used while constructing a key with [`KeyCodec`]. Freeze to [`Key`] with
26/// [`Bytes::from`] / [`BytesMut::freeze`] when done mutating.
27pub type KeyMut = BytesMut;
28
29/// A store value. Variable length.
30pub type Value = Bytes;
31
32/// Errors returned by key-length validation helpers.
33#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
34pub enum KeyValidationError {
35    #[error("key length {len} exceeds max {max}")]
36    TooLong { len: usize, max: usize },
37}
38
39/// Errors returned by [`KeyCodec`].
40#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
41pub enum KeyCodecError {
42    #[error("key length {len} is outside valid range [{min}, {max}]")]
43    InvalidKeyLength { len: usize, min: usize, max: usize },
44    #[error("payload length {payload_len} exceeds codec capacity {max_payload_len}")]
45    PayloadTooLarge {
46        payload_len: usize,
47        max_payload_len: usize,
48    },
49    #[error("payload range offset={offset} len={len} exceeds codec capacity {max_payload_len}")]
50    PayloadRangeOutOfBounds {
51        offset: usize,
52        len: usize,
53        max_payload_len: usize,
54    },
55    #[error("key does not match this codec prefix")]
56    PrefixMismatch,
57}
58
59/// Bit-packed key layout: a small prefix id in the leading reserved bits, payload in the remainder.
60///
61/// For example, with 4 reserved bits the first nibble selects the prefix and the rest of the key
62/// carries the encoded logical payload.
63#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
64pub struct KeyCodec {
65    reserved_bits: u8,
66    prefix: u16,
67}
68
69impl KeyCodec {
70    /// Build a codec with `reserved_bits` high bits reserved for `prefix`.
71    pub const fn new(reserved_bits: u8, prefix: u16) -> Self {
72        assert!(reserved_bits <= 16, "reserved bits must be <= 16");
73        let max_prefix = prefix_bit_mask(reserved_bits);
74        assert!(prefix <= max_prefix, "prefix does not fit in reserved bits");
75        Self {
76            reserved_bits,
77            prefix,
78        }
79    }
80
81    /// The number of reserved high bits at the start of the key.
82    #[inline]
83    pub const fn reserved_bits(self) -> u8 {
84        self.reserved_bits
85    }
86
87    /// Family id stored in the reserved high bits.
88    #[inline]
89    pub const fn prefix(self) -> u16 {
90        self.prefix
91    }
92
93    /// Minimum key length in bytes needed to store this codec's reserved bits.
94    #[inline]
95    pub const fn min_key_len(self) -> usize {
96        (self.reserved_bits as usize).div_ceil(8)
97    }
98
99    /// Maximum logical payload bytes that can ever fit under this codec.
100    #[inline]
101    pub const fn max_payload_capacity_bytes(self) -> usize {
102        ((MAX_KEY_LEN * 8) - self.reserved_bits as usize) / 8
103    }
104
105    /// Maximum logical payload bytes that fit under this codec.
106    ///
107    /// This reflects the global physical key cap rather than a per-key fixed
108    /// width. Families that need an exact key length should use
109    /// [`Self::payload_capacity_bytes_for_key_len`].
110    #[inline]
111    pub const fn payload_capacity_bytes(self) -> usize {
112        self.max_payload_capacity_bytes()
113    }
114
115    /// Maximum logical payload bytes that fit in a key of `key_len` bytes.
116    #[inline]
117    pub const fn payload_capacity_bytes_for_key_len(self, key_len: usize) -> usize {
118        ((key_len * 8).saturating_sub(self.reserved_bits as usize)) / 8
119    }
120
121    /// Smallest physical key length that can store `payload_len` bytes.
122    #[inline]
123    pub const fn min_key_len_for_payload(self, payload_len: usize) -> usize {
124        (self.reserved_bits as usize + payload_len * 8).div_ceil(8)
125    }
126
127    /// Absolute bit offset for a payload byte offset within the physical key.
128    #[inline]
129    pub const fn payload_bit_offset(self, payload_byte_offset: usize) -> usize {
130        self.reserved_bits as usize + (payload_byte_offset * 8)
131    }
132
133    /// Create a zero-filled key buffer with the prefix id written in the reserved bits.
134    pub fn new_key_with_len(self, total_bytes: usize) -> Result<KeyMut, KeyCodecError> {
135        self.validate_key_len(total_bytes)?;
136        let mut key = BytesMut::with_capacity(total_bytes);
137        key.resize(total_bytes, 0);
138        write_prefix_bits(&mut key, self.reserved_bits, self.prefix);
139        Ok(key)
140    }
141
142    /// Create the shortest zero-filled key for this prefix.
143    pub fn new_key(self) -> Key {
144        Bytes::from(
145            self.new_key_with_len(self.min_key_len())
146                .expect("minimum codec key length must always be valid"),
147        )
148    }
149
150    /// Encode an entire logical payload into a physical key.
151    pub fn encode(self, payload: &[u8]) -> Result<Key, KeyCodecError> {
152        let max_payload_len = self.max_payload_capacity_bytes();
153        if payload.len() > max_payload_len {
154            return Err(KeyCodecError::PayloadTooLarge {
155                payload_len: payload.len(),
156                max_payload_len,
157            });
158        }
159        let mut key = self.new_key_with_len(self.min_key_len_for_payload(payload.len()))?;
160        self.write_payload(&mut key, 0, payload)?;
161        Ok(key.freeze())
162    }
163
164    /// Decode `payload_len` bytes from a key that belongs to this codec.
165    pub fn decode(self, key: &Key, payload_len: usize) -> Result<Vec<u8>, KeyCodecError> {
166        if !self.matches(key) {
167            return Err(KeyCodecError::PrefixMismatch);
168        }
169        self.read_payload(key, 0, payload_len)
170    }
171
172    /// Write logical payload bytes at a byte offset within the shifted payload.
173    pub fn write_payload(
174        self,
175        key: &mut KeyMut,
176        payload_byte_offset: usize,
177        bytes: &[u8],
178    ) -> Result<(), KeyCodecError> {
179        self.ensure_payload_range(key.len(), payload_byte_offset, bytes.len())?;
180        let start_bit = self.payload_bit_offset(payload_byte_offset);
181        write_bits_from_bytes(key, start_bit, bytes, bytes.len() * 8);
182        Ok(())
183    }
184
185    /// Fill a payload byte range with a repeated byte value.
186    pub fn fill_payload(
187        self,
188        key: &mut KeyMut,
189        payload_byte_offset: usize,
190        len: usize,
191        value: u8,
192    ) -> Result<(), KeyCodecError> {
193        self.ensure_payload_range(key.len(), payload_byte_offset, len)?;
194        if len == 0 {
195            return Ok(());
196        }
197        self.write_payload(key, payload_byte_offset, &vec![value; len])
198    }
199
200    /// Read logical payload bytes at a byte offset within the shifted payload.
201    pub fn read_payload(
202        self,
203        key: &Key,
204        payload_byte_offset: usize,
205        len: usize,
206    ) -> Result<Vec<u8>, KeyCodecError> {
207        self.ensure_payload_range(key.len(), payload_byte_offset, len)?;
208        let start_bit = self.payload_bit_offset(payload_byte_offset);
209        let mut out = vec![0u8; len];
210        read_bits_to_bytes(key, start_bit, &mut out, len * 8);
211        Ok(out)
212    }
213
214    /// Read an exact-size payload slice into an array.
215    pub fn read_payload_exact<const N: usize>(
216        self,
217        key: &Key,
218        payload_byte_offset: usize,
219    ) -> Result<[u8; N], KeyCodecError> {
220        let bytes = self.read_payload(key, payload_byte_offset, N)?;
221        let mut out = [0u8; N];
222        out.copy_from_slice(&bytes);
223        Ok(out)
224    }
225
226    /// Copy payload bytes from one codec-managed key into another.
227    pub fn copy_payload(
228        self,
229        src: &Key,
230        src_payload_byte_offset: usize,
231        dst: &mut KeyMut,
232        dst_payload_byte_offset: usize,
233        len: usize,
234    ) -> Result<(), KeyCodecError> {
235        let bytes = self.read_payload(src, src_payload_byte_offset, len)?;
236        self.write_payload(dst, dst_payload_byte_offset, &bytes)
237    }
238
239    /// True when the key belongs to this codec prefix.
240    #[inline]
241    pub fn matches(self, key: &Key) -> bool {
242        key.len() >= self.min_key_len() && read_prefix_bits(key, self.reserved_bits) == self.prefix
243    }
244
245    /// Inclusive lower and upper bounds for keys in this prefix with a fixed byte length.
246    pub fn prefix_bounds_for_len(self, total_bytes: usize) -> Result<(Key, Key), KeyCodecError> {
247        self.validate_key_len(total_bytes)?;
248        let mut start = vec![0u8; total_bytes];
249        let mut end = vec![0xFFu8; total_bytes];
250        write_prefix_bits(&mut start, self.reserved_bits, self.prefix);
251        write_prefix_bits(&mut end, self.reserved_bits, self.prefix);
252        Ok((Bytes::from(start), Bytes::from(end)))
253    }
254
255    /// Inclusive lower and upper bounds for this prefix across the full supported key-length domain.
256    pub fn prefix_bounds(self) -> (Key, Key) {
257        let start = Bytes::from(
258            self.new_key_with_len(self.min_key_len())
259                .expect("minimum codec key length must always be valid"),
260        );
261        let mut end = vec![0xFFu8; MAX_KEY_LEN];
262        write_prefix_bits(&mut end, self.reserved_bits, self.prefix);
263        (start, Bytes::from(end))
264    }
265
266    fn validate_key_len(self, len: usize) -> Result<(), KeyCodecError> {
267        if !(MIN_KEY_LEN..=MAX_KEY_LEN).contains(&len) || len < self.min_key_len() {
268            return Err(KeyCodecError::InvalidKeyLength {
269                len,
270                min: self.min_key_len(),
271                max: MAX_KEY_LEN,
272            });
273        }
274        Ok(())
275    }
276
277    fn ensure_payload_range(
278        self,
279        key_len: usize,
280        offset: usize,
281        len: usize,
282    ) -> Result<(), KeyCodecError> {
283        self.validate_key_len(key_len)?;
284        let max_payload_len = self.payload_capacity_bytes_for_key_len(key_len);
285        if offset.saturating_add(len) > max_payload_len {
286            return Err(KeyCodecError::PayloadRangeOutOfBounds {
287                offset,
288                len,
289                max_payload_len,
290            });
291        }
292        Ok(())
293    }
294}
295
296/// True when a key size is valid for ingest/storage.
297#[inline]
298pub fn is_valid_key_size(size: usize) -> bool {
299    (MIN_KEY_LEN..=MAX_KEY_LEN).contains(&size)
300}
301
302/// Validate a key length against the store's physical key limits.
303#[inline]
304pub fn validate_key_size(size: usize) -> Result<(), KeyValidationError> {
305    if is_valid_key_size(size) {
306        Ok(())
307    } else {
308        Err(KeyValidationError::TooLong {
309            len: size,
310            max: MAX_KEY_LEN,
311        })
312    }
313}
314
315/// Return the smallest valid key strictly greater than `key`.
316///
317/// Lexicographic ordering on variable-length byte strings means appending a
318/// trailing `0x00` is the immediate successor whenever the key is shorter than
319/// `MAX_KEY_LEN`.
320pub fn next_key(key: &Key) -> Option<Key> {
321    if key.len() < MAX_KEY_LEN {
322        let mut out = key.to_vec();
323        out.push(0);
324        return Some(Bytes::from(out));
325    }
326
327    let mut out = key.to_vec();
328    for idx in (0..out.len()).rev() {
329        if out[idx] != u8::MAX {
330            out[idx] += 1;
331            out.truncate(idx + 1);
332            return Some(Bytes::from(out));
333        }
334    }
335    None
336}
337
338#[inline]
339const fn prefix_bit_mask(bits: u8) -> u16 {
340    if bits == 0 {
341        0
342    } else if bits >= 16 {
343        u16::MAX
344    } else {
345        (1u16 << bits) - 1
346    }
347}
348
349fn write_prefix_bits(key: &mut [u8], reserved_bits: u8, prefix: u16) {
350    for bit_idx in 0..reserved_bits as usize {
351        let shift = reserved_bits as usize - 1 - bit_idx;
352        let value = ((prefix >> shift) & 1) != 0;
353        write_bit_be(key, bit_idx, value);
354    }
355}
356
357fn read_prefix_bits(key: &Key, reserved_bits: u8) -> u16 {
358    let mut prefix = 0u16;
359    for bit_idx in 0..reserved_bits as usize {
360        prefix <<= 1;
361        if read_bit_be(key, bit_idx) {
362            prefix |= 1;
363        }
364    }
365    prefix
366}
367
368pub(crate) fn write_bits_from_bytes(
369    dst: &mut [u8],
370    dst_bit_offset: usize,
371    src: &[u8],
372    bit_len: usize,
373) {
374    for bit_idx in 0..bit_len {
375        let value = read_bit_be(src, bit_idx);
376        write_bit_be(dst, dst_bit_offset + bit_idx, value);
377    }
378}
379
380pub(crate) fn read_bits_to_bytes(
381    src: &[u8],
382    src_bit_offset: usize,
383    dst: &mut [u8],
384    bit_len: usize,
385) {
386    dst.fill(0);
387    for bit_idx in 0..bit_len {
388        let value = read_bit_be(src, src_bit_offset + bit_idx);
389        write_bit_be(dst, bit_idx, value);
390    }
391}
392
393pub(crate) fn read_bit_be(bytes: &[u8], bit_idx: usize) -> bool {
394    let byte_idx = bit_idx / 8;
395    let bit_in_byte = 7 - (bit_idx % 8);
396    bytes
397        .get(byte_idx)
398        .is_some_and(|byte| ((byte >> bit_in_byte) & 1) != 0)
399}
400
401pub(crate) fn write_bit_be(bytes: &mut [u8], bit_idx: usize, value: bool) {
402    let byte_idx = bit_idx / 8;
403    let bit_in_byte = 7 - (bit_idx % 8);
404    let mask = 1 << bit_in_byte;
405    if let Some(byte) = bytes.get_mut(byte_idx) {
406        if value {
407            *byte |= mask;
408        } else {
409            *byte &= !mask;
410        }
411    }
412}
413
414/// Target block size for each tier.
415pub fn target_block_size(tier: u8) -> usize {
416    const KB: usize = 1024;
417    const MB: usize = 1024 * KB;
418    const GB: usize = 1024 * MB;
419    match tier {
420        0 => 64 * MB,
421        1 => 128 * MB,
422        2 => 256 * MB,
423        3 => 512 * MB,
424        4 => GB,
425        5 => 2 * GB,
426        _ => 2 * GB,
427    }
428}
429
430/// Sub-block size: fixed at 64KB uncompressed across all tiers.
431///
432/// We compress sub-blocks from this fixed logical size so every decoded chunk
433/// has a small, known upper bound. That keeps compaction and other merge/decode
434/// paths incremental instead of materializing huge blocks in memory, which
435/// avoids OOM risk. It also limits waste on point lookups: a cache miss fetches
436/// and decompresses only one small slice instead of an entire large block,
437/// improving cache granularity. Range scans still coalesce adjacent sub-blocks
438/// into larger range GETs, so we keep fine-grained point reads without forcing
439/// one network request per sub-block on sequential reads.
440pub const SUB_BLOCK_SIZE: usize = 64 * 1024;
441
442/// Bloom filter bits per key by tier.
443pub fn bloom_bits_per_key(tier: u8) -> Option<u32> {
444    match tier {
445        0 => Some(14),
446        1..=3 => Some(10),
447        _ => None, // T4-T5: no bloom
448    }
449}
450
451/// Compute tier from block age.
452pub fn compute_tier(age: chrono::Duration) -> u8 {
453    if age < chrono::Duration::hours(4) {
454        0
455    } else if age < chrono::Duration::hours(12) {
456        1
457    } else if age < chrono::Duration::hours(36) {
458        2
459    } else if age < chrono::Duration::days(4) {
460        3
461    } else if age < chrono::Duration::days(30) {
462        4
463    } else {
464        5
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    #[test]
473    fn key_codec_uses_4_reserved_bits_plus_payload() {
474        let codec = KeyCodec::new(4, 0x5);
475        let key = codec.encode(&[0xAB, 0xCD]).expect("encoded key");
476        assert_eq!(key[0], 0x5A);
477        assert_eq!(key[1], 0xBC);
478        assert_eq!(
479            codec.decode(&key, 2).expect("decoded payload"),
480            vec![0xAB, 0xCD]
481        );
482    }
483
484    #[test]
485    fn key_codec_preserves_payload_order_within_prefix() {
486        let codec = KeyCodec::new(4, 0x3);
487        let lower = codec.encode(&[0x10, 0x00]).expect("lower key");
488        let mid = codec.encode(&[0x10, 0x01]).expect("mid key");
489        let upper = codec.encode(&[0x20, 0x00]).expect("upper key");
490        assert!(lower < mid);
491        assert!(mid < upper);
492    }
493
494    #[test]
495    fn key_codec_prefix_bounds_cover_only_one_prefix() {
496        let codec = KeyCodec::new(4, 0xA);
497        let (start, end) = codec.prefix_bounds();
498        let key = codec.encode(&[0x11, 0x22]).expect("prefix key");
499        assert!(codec.matches(&start));
500        assert!(codec.matches(&end));
501        assert!(codec.matches(&key));
502        assert!(start <= key && key <= end);
503
504        let other = KeyCodec::new(4, 0xB)
505            .encode(&[0x11, 0x22])
506            .expect("other key");
507        assert!(!codec.matches(&other));
508        assert!(other > end || other < start);
509        assert!(start <= end);
510    }
511
512    #[test]
513    fn key_codec_reads_and_writes_payload_at_byte_offsets() {
514        let codec = KeyCodec::new(3, 0b101);
515        let mut key = codec.new_key_with_len(4).expect("new key");
516        codec
517            .write_payload(&mut key, 0, &[0xDE, 0xAD])
518            .expect("write head");
519        codec
520            .write_payload(&mut key, 2, &[0xBE])
521            .expect("write tail");
522        assert_eq!(
523            codec
524                .read_payload(&key.freeze(), 0, 3)
525                .expect("read payload"),
526            vec![0xDE, 0xAD, 0xBE]
527        );
528    }
529
530    #[test]
531    #[should_panic(expected = "prefix does not fit in reserved bits")]
532    fn key_codec_rejects_out_of_range_prefix() {
533        KeyCodec::new(3, 0b1000);
534    }
535
536    #[test]
537    fn key_codec_rejects_oversized_payload() {
538        let codec = KeyCodec::new(4, 0);
539        let payload = vec![0u8; codec.max_payload_capacity_bytes() + 1];
540        let err = codec.encode(&payload).expect_err("payload should not fit");
541        assert!(matches!(err, KeyCodecError::PayloadTooLarge { .. }));
542    }
543
544    #[test]
545    fn next_key_prefers_append_zero_for_short_keys() {
546        assert_eq!(
547            next_key(&Bytes::from(vec![0x12, 0x34])),
548            Some(Bytes::from(vec![0x12, 0x34, 0x00]))
549        );
550    }
551
552    #[test]
553    fn next_key_carries_when_at_max_length() {
554        let mut key = vec![0u8; MAX_KEY_LEN];
555        key[MAX_KEY_LEN - 2] = 0x12;
556        key[MAX_KEY_LEN - 1] = 0xFF;
557        let key = Bytes::from(key);
558        let next = next_key(&key).expect("next");
559        assert_eq!(next.len(), MAX_KEY_LEN - 1);
560        assert_eq!(next[MAX_KEY_LEN - 2], 0x13);
561    }
562
563    #[test]
564    fn key_size_validation_bounds() {
565        assert!(validate_key_size(0).is_ok());
566        assert!(validate_key_size(MAX_KEY_LEN).is_ok());
567        assert!(validate_key_size(MAX_KEY_LEN + 1).is_err());
568    }
569
570    #[test]
571    fn tier_assignment() {
572        assert_eq!(compute_tier(chrono::Duration::minutes(30)), 0);
573        assert_eq!(compute_tier(chrono::Duration::hours(6)), 1);
574        assert_eq!(compute_tier(chrono::Duration::hours(24)), 2);
575        assert_eq!(compute_tier(chrono::Duration::days(3)), 3);
576        assert_eq!(compute_tier(chrono::Duration::days(15)), 4);
577        assert_eq!(compute_tier(chrono::Duration::days(60)), 5);
578    }
579}