Skip to main content

fast_cache/storage/embedded_store/
routing.rs

1use crate::storage::{Bytes, hash_key};
2
3#[cfg(feature = "unsafe")]
4use super::SessionSlotMap;
5
6/// Precomputed routing metadata for one key.
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub struct EmbeddedKeyRoute {
9    /// Shard selected for the key.
10    pub shard_id: usize,
11    /// Precomputed primary key hash.
12    pub key_hash: u64,
13}
14
15/// Precomputed shard placement for one session prefix.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct EmbeddedSessionRoute {
18    /// Shard selected for the session.
19    pub shard_id: usize,
20}
21
22/// Selects how embedded database traffic is routed across shards.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
24pub enum EmbeddedRouteMode {
25    /// Route by the full key bytes. This matches the generic store behavior.
26    #[default]
27    FullKey,
28    /// Route all `s:<session>:c:<chunk>` keys for a session to the same shard.
29    SessionPrefix,
30}
31
32impl EmbeddedRouteMode {
33    /// Returns the stable configuration string for this route mode.
34    #[inline(always)]
35    pub fn as_str(self) -> &'static str {
36        match self {
37            Self::FullKey => "full_key",
38            Self::SessionPrefix => "session_prefix",
39        }
40    }
41}
42
43#[inline(always)]
44pub(crate) fn compute_key_route(
45    route_mode: EmbeddedRouteMode,
46    shift: u32,
47    key: &[u8],
48) -> EmbeddedKeyRoute {
49    let key_hash = hash_key(key);
50    let route_hash = match route_mode {
51        EmbeddedRouteMode::FullKey => key_hash,
52        EmbeddedRouteMode::SessionPrefix => hash_key(session_route_prefix(key)),
53    };
54    EmbeddedKeyRoute {
55        shard_id: stripe_index(route_hash, shift),
56        key_hash,
57    }
58}
59
60#[inline(always)]
61pub(crate) fn compute_session_shard(shift: u32, session_prefix: &[u8]) -> usize {
62    stripe_index(hash_key(session_prefix), shift)
63}
64
65#[inline(always)]
66pub fn stripe_index(hash: u64, shift: u32) -> usize {
67    if shift == usize::BITS {
68        0
69    } else {
70        ((hash as usize) << 7) >> shift
71    }
72}
73
74#[inline(always)]
75pub fn shift_for(shard_count: usize) -> u32 {
76    debug_assert!(shard_count > 0 && shard_count.is_power_of_two());
77    usize::BITS - shard_count.trailing_zeros()
78}
79
80#[inline(always)]
81pub(crate) fn assert_valid_shard_count(shard_count: usize) {
82    assert!(
83        shard_count > 0 && shard_count.is_power_of_two(),
84        "shard_count must be a non-zero power of two; got {shard_count}"
85    );
86}
87
88#[cfg(feature = "unsafe")]
89#[inline(always)]
90pub(super) fn can_skip_session_lookup(key: &[u8], session_slots: &SessionSlotMap) -> bool {
91    session_slots.is_empty() || (!key.starts_with(b"s:") && !key.contains(&b'@'))
92}
93
94#[inline(always)]
95pub(super) fn can_route_with_key_hash(
96    route_mode: EmbeddedRouteMode,
97    shard_count: usize,
98    key: &[u8],
99) -> bool {
100    route_mode == EmbeddedRouteMode::FullKey || shard_count == 1 || !key.starts_with(b"s:")
101}
102
103#[inline(always)]
104pub(super) fn can_use_route_hash_as_key_hash(route_mode: EmbeddedRouteMode, key: &[u8]) -> bool {
105    route_mode == EmbeddedRouteMode::FullKey || !key.starts_with(b"s:")
106}
107
108#[inline(always)]
109pub(super) fn uses_flat_key_storage(route_mode: EmbeddedRouteMode, key: &[u8]) -> bool {
110    route_mode == EmbeddedRouteMode::FullKey || derived_session_storage_prefix(key).is_none()
111}
112
113#[inline(always)]
114pub(super) fn session_route_prefix(key: &[u8]) -> &[u8] {
115    if !key.starts_with(b"s:") {
116        return key;
117    }
118
119    if let Some(index) = session_chunk_separator(key) {
120        return &key[..index];
121    }
122
123    key
124}
125
126#[inline(always)]
127fn session_chunk_separator(key: &[u8]) -> Option<usize> {
128    if key.len() < 3 {
129        return None;
130    }
131
132    let mut index = key.len() - 3;
133    loop {
134        if key[index] == b':' && key[index + 1] == b'c' && key[index + 2] == b':' {
135            return Some(index);
136        }
137        if index == 0 {
138            return None;
139        }
140        index -= 1;
141    }
142}
143
144#[inline(always)]
145pub(super) fn derived_session_storage_prefix(key: &[u8]) -> Option<Bytes> {
146    if key.starts_with(b"s:") {
147        return Some(session_route_prefix(key).to_vec());
148    }
149
150    // Fast reject ordinary keys before doing UTF-8 decoding and string splits.
151    if !key.contains(&b'@') {
152        return None;
153    }
154
155    let key_str = std::str::from_utf8(key).ok()?;
156    let session = key_str
157        .split('@')
158        .find_map(|part| part.strip_prefix("session%"))?;
159    Some(format!("lmcache-session:{session}").into_bytes())
160}
161
162#[inline(always)]
163pub(super) fn point_write_session_storage_prefix(key: &[u8]) -> Option<Bytes> {
164    if key.starts_with(b"s:") {
165        Some(session_route_prefix(key).to_vec())
166    } else {
167        None
168    }
169}
170
171#[inline(always)]
172pub(super) fn batch_derived_session_storage_prefix(keys: &[Bytes]) -> Option<Bytes> {
173    let first = derived_session_storage_prefix(keys.first()?.as_slice())?;
174    if keys[1..].iter().all(|key| {
175        derived_session_storage_prefix(key.as_slice()).as_deref() == Some(first.as_slice())
176    }) {
177        Some(first)
178    } else {
179        None
180    }
181}