use super::*;
impl FlatMap {
#[cfg(feature = "embedded")]
#[inline(always)]
pub fn set_hashed_local<K, V>(
&mut self,
hash: u64,
key: K,
value: V,
expire_at_ms: Option<u64>,
now_ms: u64,
) where
K: Into<Bytes>,
V: Into<Bytes>,
{
self.disable_fast_point_map();
#[cfg(feature = "telemetry")]
let start = self.telemetry.as_ref().map(|_| Instant::now());
let key = key.into();
let mut replacement = Some(SharedBytes::from(value.into()));
let access_tick = if self.eviction_policy == EvictionPolicy::None {
0
} else {
self.next_access_tick()
};
self.record_lru_touch(hash, access_tick);
#[cfg(feature = "telemetry")]
let written_len = replacement.as_ref().map_or(0, |bytes| bytes.len());
#[cfg(feature = "telemetry")]
let (key_delta, memory_delta): (isize, isize);
match self
.entries
.entry(hash, |entry| entry.matches(hash, &key), |entry| entry.hash)
{
hashbrown::hash_table::Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
let had_ttl = entry.expire_at_ms.is_some();
let previous_value_len = entry.value.len();
entry.value = replacement.take().unwrap();
#[cfg(feature = "telemetry")]
{
key_delta = 0isize;
memory_delta = entry.value.len() as isize - previous_value_len as isize;
}
entry.access.record_access(access_tick);
self.stored_bytes = self
.stored_bytes
.saturating_sub(previous_value_len)
.saturating_add(entry.value.len());
entry.expire_at_ms = expire_at_ms;
self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
}
hashbrown::hash_table::Entry::Vacant(vacant) => {
let key_len = key.len();
let value_len = replacement.as_ref().map_or(0, |bytes| bytes.len());
vacant.insert(FlatEntry {
hash,
key_tag: hash_key_tag_from_hash(hash),
key_len,
key: key.into_boxed_slice(),
value: replacement.take().unwrap(),
expire_at_ms,
access: EntryAccessMeta {
last_touch: access_tick,
frequency: 1,
},
});
self.stored_bytes = self
.stored_bytes
.saturating_add(key_len)
.saturating_add(value_len);
#[cfg(feature = "telemetry")]
{
key_delta = 1isize;
memory_delta = (key_len + value_len) as isize;
}
if expire_at_ms.is_some() {
self.ttl_entries = self.ttl_entries.saturating_add(1);
}
}
}
#[cfg(feature = "telemetry")]
self.record_set_metrics(written_len, key_delta, memory_delta, start);
self.enforce_memory_limit(now_ms);
}
#[cfg(feature = "embedded")]
#[inline(always)]
pub fn set_slice_hashed_local(
&mut self,
hash: u64,
key: &[u8],
value: &[u8],
expire_at_ms: Option<u64>,
now_ms: u64,
) {
let key_tag = hash_key_tag_from_hash(hash);
self.set_slice_hashed_tagged_local(hash, key_tag, key, value, expire_at_ms, now_ms);
}
#[cfg(feature = "embedded")]
#[inline(always)]
pub fn set_slice_hashed_tagged_no_ttl_local(
&mut self,
hash: u64,
key_tag: u64,
key: &[u8],
value: &[u8],
) {
debug_assert_eq!(key_tag, hash_key_tag_from_hash(hash));
self.disable_fast_point_map();
if !self.retired_values.is_empty() {
self.reclaim_retired_if_quiescent();
}
#[cfg(feature = "telemetry")]
let start = self.telemetry.as_ref().map(|_| Instant::now());
let has_active_readers = self.has_active_readers();
let should_touch_access = self.eviction_policy != EvictionPolicy::None;
let access_tick = if should_touch_access {
self.next_access_tick()
} else {
0
};
self.record_lru_touch(hash, access_tick);
let reuse_values =
should_reuse_value_buffer(value.len()) && !self.reusable_values.is_empty();
let mut reusable_values = if reuse_values {
mem::take(&mut self.reusable_values)
} else {
Vec::new()
};
let mut reusable_value_bytes = if reuse_values {
mem::take(&mut self.reusable_value_bytes)
} else {
0
};
#[cfg(feature = "telemetry")]
let written_len = value.len();
#[cfg(feature = "telemetry")]
let (key_delta, memory_delta): (isize, isize);
match self.entries.entry(
hash,
|entry| entry.matches_prepared(hash, key, key_tag),
|entry| entry.hash,
) {
hashbrown::hash_table::Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
let had_ttl = entry.expire_at_ms.is_some();
let previous_value_len = entry.value.len();
let should_replace_value = previous_value_len != value.len() || has_active_readers;
let mut retired_value = None;
if should_replace_value {
let new_value = if reuse_values {
shared_bytes_from_reusable_pool(
value,
&mut reusable_values,
&mut reusable_value_bytes,
)
} else {
shared_bytes_from_slice(value)
};
retired_value = Some(mem::replace(&mut entry.value, new_value));
} else {
#[cfg(feature = "unsafe")]
{
unsafe {
copy_hot_value_bytes(
entry.value.as_ptr().cast_mut(),
value.as_ptr(),
value.len(),
);
}
}
#[cfg(not(feature = "unsafe"))]
{
let current_value = mem::take(&mut entry.value);
match current_value.try_into_mut() {
Ok(mut writable) => {
writable[..].copy_from_slice(value);
entry.value = writable.freeze();
}
Err(current_value) => {
entry.value = if reuse_values {
shared_bytes_from_reusable_pool(
value,
&mut reusable_values,
&mut reusable_value_bytes,
)
} else {
shared_bytes_from_slice(value)
};
retired_value = Some(current_value);
}
}
}
}
#[cfg(feature = "telemetry")]
{
key_delta = 0isize;
memory_delta = entry.value.len() as isize - previous_value_len as isize;
}
if should_touch_access {
entry.access.record_access(access_tick);
}
if previous_value_len != entry.value.len() {
self.stored_bytes = self
.stored_bytes
.saturating_sub(previous_value_len)
.saturating_add(entry.value.len());
}
if had_ttl {
entry.expire_at_ms = None;
self.ttl_entries = self.ttl_entries.saturating_sub(1);
}
if let Some(old_value) = retired_value {
if has_active_readers {
self.retire_value(old_value);
} else if reuse_values {
recycle_value_into_pool(
old_value,
&mut reusable_values,
&mut reusable_value_bytes,
);
} else {
self.recycle_value(old_value);
}
}
}
hashbrown::hash_table::Entry::Vacant(vacant) => {
let key_len = key.len();
let value_len = value.len();
let stored_value = if reuse_values {
shared_bytes_from_reusable_pool(
value,
&mut reusable_values,
&mut reusable_value_bytes,
)
} else {
shared_bytes_from_slice(value)
};
vacant.insert(FlatEntry {
hash,
key_tag,
key_len,
key: key.to_vec().into_boxed_slice(),
value: stored_value,
expire_at_ms: None,
access: EntryAccessMeta {
last_touch: access_tick,
frequency: 1,
},
});
self.stored_bytes = self
.stored_bytes
.saturating_add(key_len)
.saturating_add(value_len);
#[cfg(feature = "telemetry")]
{
key_delta = 1isize;
memory_delta = (key_len + value_len) as isize;
}
}
}
if reuse_values {
self.reusable_values = reusable_values;
self.reusable_value_bytes = reusable_value_bytes;
}
#[cfg(feature = "telemetry")]
self.record_set_metrics(written_len, key_delta, memory_delta, start);
}
#[cfg(feature = "embedded")]
#[inline(always)]
pub fn set_slice_hashed_tagged_local(
&mut self,
hash: u64,
key_tag: u64,
key: &[u8],
value: &[u8],
expire_at_ms: Option<u64>,
now_ms: u64,
) {
debug_assert_eq!(key_tag, hash_key_tag_from_hash(hash));
self.disable_fast_point_map();
self.reclaim_retired_if_quiescent();
#[cfg(feature = "telemetry")]
let start = self.telemetry.as_ref().map(|_| Instant::now());
let has_active_readers = self.has_active_readers();
let should_touch_access = self.eviction_policy != EvictionPolicy::None;
let access_tick = if should_touch_access {
self.next_access_tick()
} else {
0
};
self.record_lru_touch(hash, access_tick);
#[cfg(feature = "telemetry")]
let written_len = value.len();
#[cfg(feature = "telemetry")]
let (key_delta, memory_delta): (isize, isize);
match self.entries.entry(
hash,
|entry| entry.matches_prepared(hash, key, key_tag),
|entry| entry.hash,
) {
hashbrown::hash_table::Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
let had_ttl = entry.expire_at_ms.is_some();
let previous_value_len = entry.value.len();
let mut retired_value = None;
if previous_value_len == value.len() && !has_active_readers {
let current_value = mem::take(&mut entry.value);
match current_value.try_into_mut() {
Ok(mut writable) => {
writable[..].copy_from_slice(value);
entry.value = writable.freeze();
}
Err(current_value) => {
entry.value = shared_bytes_from_slice(value);
retired_value = Some(current_value);
}
}
} else {
retired_value = Some(mem::replace(
&mut entry.value,
shared_bytes_from_slice(value),
));
}
#[cfg(feature = "telemetry")]
{
key_delta = 0isize;
memory_delta = entry.value.len() as isize - previous_value_len as isize;
}
if should_touch_access {
entry.access.record_access(access_tick);
}
if previous_value_len != entry.value.len() {
self.stored_bytes = self
.stored_bytes
.saturating_sub(previous_value_len)
.saturating_add(entry.value.len());
}
if had_ttl || expire_at_ms.is_some() {
entry.expire_at_ms = expire_at_ms;
self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
}
if let Some(old_value) = retired_value {
self.retire_value(old_value);
}
}
hashbrown::hash_table::Entry::Vacant(vacant) => {
let key_len = key.len();
let value_len = value.len();
vacant.insert(FlatEntry {
hash,
key_tag,
key_len,
key: key.to_vec().into_boxed_slice(),
value: shared_bytes_from_slice(value),
expire_at_ms,
access: EntryAccessMeta {
last_touch: access_tick,
frequency: 1,
},
});
self.stored_bytes = self
.stored_bytes
.saturating_add(key_len)
.saturating_add(value_len);
#[cfg(feature = "telemetry")]
{
key_delta = 1isize;
memory_delta = (key_len + value_len) as isize;
}
if expire_at_ms.is_some() {
self.ttl_entries = self.ttl_entries.saturating_add(1);
}
}
}
#[cfg(feature = "telemetry")]
self.record_set_metrics(written_len, key_delta, memory_delta, start);
self.enforce_memory_limit(now_ms);
}
}