use super::*;
impl EmbeddedStore {
pub fn delete(&self, key: &[u8]) -> bool {
let now_ms = now_millis();
let route = self.route_key(key);
self.delete_routed_then(route, key, now_ms, || {})
}
pub(crate) fn delete_routed_then(
&self,
route: EmbeddedKeyRoute,
key: &[u8],
now_ms: u64,
after_delete: impl FnOnce(),
) -> bool {
let route = match route.shard_id < self.shards.len() {
true => route,
false => self.route_key(key),
};
if self.objects.has_objects() {
let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
let mut shard = self.shards[route.shard_id].write();
let deleted_object = bucket.delete_any(key);
if deleted_object {
self.objects.note_deleted(route.shard_id);
}
let deleted_session = if let Some(session_prefix) = derived_session_storage_prefix(key)
{
shard
.session_slots
.delete_hashed(&session_prefix, route.key_hash, key)
} else {
false
};
let deleted_map = shard.map.delete_hashed(route.key_hash, key, now_ms);
let deleted = deleted_object || deleted_session || deleted_map;
if deleted {
after_delete();
}
return deleted;
}
let mut shard = self.shards[route.shard_id].write();
if let Some(session_prefix) = derived_session_storage_prefix(key)
&& shard
.session_slots
.delete_hashed(&session_prefix, route.key_hash, key)
{
after_delete();
return true;
}
let deleted = shard.map.delete_hashed(route.key_hash, key, now_ms);
if deleted {
after_delete();
}
deleted
}
pub fn exists(&self, key: &[u8]) -> bool {
let route = self.route_key(key);
if self.objects.has_objects() {
let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
if bucket.object_is_expired(key, now_millis()) {
drop(bucket);
let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
if bucket.delete_expired(key, now_millis()) {
self.objects.note_deleted(route.shard_id);
}
return self.get(key).is_some();
}
if bucket.contains_object(key) {
return true;
}
}
self.get(key).is_some()
}
pub fn ttl_seconds(&self, key: &[u8]) -> i64 {
let route = self.route_key(key);
let now_ms = now_millis();
if self.objects.has_objects() {
let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
if bucket.delete_expired(key, now_ms) {
self.objects.note_deleted(route.shard_id);
return -2;
}
let ttl = bucket.ttl_millis(key, now_ms);
if ttl != -2 {
return if ttl < 0 { ttl } else { (ttl + 999) / 1_000 };
}
}
let mut shard = self.shards[route.shard_id].write();
if let Some(session_prefix) = derived_session_storage_prefix(key)
&& shard
.session_slots
.get_ref_hashed(&session_prefix, route.key_hash, key)
.is_some()
{
return -1;
}
shard.map.ttl_seconds(key, now_ms)
}
pub fn pttl_millis(&self, key: &[u8]) -> i64 {
let route = self.route_key(key);
let now_ms = now_millis();
if self.objects.has_objects() {
let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
if bucket.delete_expired(key, now_ms) {
self.objects.note_deleted(route.shard_id);
return -2;
}
let ttl = bucket.ttl_millis(key, now_ms);
if ttl != -2 {
return ttl;
}
}
let mut shard = self.shards[route.shard_id].write();
if let Some(session_prefix) = derived_session_storage_prefix(key)
&& shard
.session_slots
.get_ref_hashed(&session_prefix, route.key_hash, key)
.is_some()
{
return -1;
}
shard.map.ttl_millis(key, now_ms)
}
pub fn persist(&self, key: &[u8]) -> bool {
let route = self.route_key(key);
let now_ms = now_millis();
if self.objects.has_objects() {
let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
if bucket.delete_expired(key, now_ms) {
self.objects.note_deleted(route.shard_id);
return false;
}
let persisted = bucket.persist(key, now_ms);
if persisted {
return true;
}
if bucket.contains_object(key) {
return false;
}
}
let mut shard = self.shards[route.shard_id].write();
if let Some(session_prefix) = derived_session_storage_prefix(key)
&& shard
.session_slots
.get_ref_hashed(&session_prefix, route.key_hash, key)
.is_some()
{
return false;
}
shard.map.persist(key, now_ms)
}
pub fn expire(&self, key: &[u8], expire_at_ms: u64) -> bool {
let route = self.route_key(key);
let now_ms = now_millis();
self.expire_routed_then(route, key, expire_at_ms, now_ms, || {})
}
pub(crate) fn expire_routed_then(
&self,
route: EmbeddedKeyRoute,
key: &[u8],
expire_at_ms: u64,
now_ms: u64,
after_expire: impl FnOnce(),
) -> bool {
let route = match route.shard_id < self.shards.len() {
true => route,
false => self.route_key(key),
};
if self.objects.has_objects() {
let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
if bucket.delete_expired(key, now_ms) {
self.objects.note_deleted(route.shard_id);
return false;
}
if bucket.expire(key, expire_at_ms, now_ms) {
after_expire();
return true;
}
if bucket.contains_object(key) {
return false;
}
}
let mut shard = self.shards[route.shard_id].write();
if let Some(session_prefix) = derived_session_storage_prefix(key)
&& shard
.session_slots
.get_ref_hashed(&session_prefix, route.key_hash, key)
.is_some()
{
return false;
}
let changed = shard.map.expire(key, expire_at_ms, now_ms);
if changed {
after_expire();
}
changed
}
pub fn redis_type(&self, key: &[u8]) -> &'static str {
let route = self.route_key(key);
if self.objects.has_objects() {
let now_ms = now_millis();
let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
if bucket.object_is_expired(key, now_ms) {
drop(bucket);
let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
if bucket.delete_expired(key, now_ms) {
self.objects.note_deleted(route.shard_id);
}
return if self.get_value_bytes(key).is_some() {
"string"
} else {
"none"
};
}
if let Some(kind) = bucket.type_name(key) {
return kind;
}
}
if self.get_value_bytes(key).is_some() {
"string"
} else {
"none"
}
}
pub fn object_encoding(&self, key: &[u8]) -> Option<&'static str> {
let route = self.route_key(key);
if self.objects.has_objects() {
let now_ms = now_millis();
let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
if bucket.object_is_expired(key, now_ms) {
drop(bucket);
let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
if bucket.delete_expired(key, now_ms) {
self.objects.note_deleted(route.shard_id);
}
return self.get_value_bytes(key).map(|_| "raw");
}
if let Some(encoding) = bucket.encoding(key) {
return Some(encoding);
}
}
self.get_value_bytes(key).map(|_| "raw")
}
#[cfg(feature = "embedded")]
pub fn shard_stats_snapshot(&self) -> Vec<ShardStatsSnapshot> {
self.shards
.iter()
.enumerate()
.map(|(shard_id, shard)| {
let shard = shard.read();
let (hot, warm, cold) = shard.map.stats_snapshot();
let reads = hot
.hits
.saturating_add(hot.misses)
.saturating_add(warm.hits)
.saturating_add(warm.misses)
.saturating_add(cold.hits)
.saturating_add(cold.misses);
let expired = hot
.expirations
.saturating_add(warm.expirations)
.saturating_add(cold.expirations);
ShardStatsSnapshot {
shard_id,
key_count: shard.map.len().saturating_add(shard.session_slots.len()),
reads,
writes: 0,
deletes: 0,
expired,
maintenance_runs: 0,
hot,
warm,
cold,
}
})
.collect()
}
#[cfg(feature = "embedded")]
pub fn stats_snapshot(&self) -> (TierStatsSnapshot, TierStatsSnapshot, TierStatsSnapshot) {
let mut hot = TierStatsSnapshot {
name: "hot",
..TierStatsSnapshot::default()
};
let mut warm = TierStatsSnapshot {
name: "warm",
..TierStatsSnapshot::default()
};
let mut cold = TierStatsSnapshot {
name: "cold",
..TierStatsSnapshot::default()
};
for shard in &self.shards {
let shard = shard.read();
let (shard_hot, shard_warm, shard_cold) = shard.map.stats_snapshot();
accumulate_tier_stats(&mut hot, &shard_hot);
accumulate_tier_stats(&mut warm, &shard_warm);
accumulate_tier_stats(&mut cold, &shard_cold);
}
(hot, warm, cold)
}
pub fn process_maintenance(&self) -> usize {
let now_ms = now_millis();
self.shards
.iter()
.map(|shard| {
let mut shard = shard.write();
shard.map.process_maintenance(now_ms)
})
.sum()
}
pub fn restore_entries<I>(&self, entries: I)
where
I: IntoIterator<Item = StoredEntry>,
{
let now_ms = now_millis();
for entry in entries {
if entry
.expire_at_ms
.is_some_and(|expire_at_ms| expire_at_ms <= now_ms)
{
continue;
}
let route = self.route_key(&entry.key);
let mut shard = self.shards[route.shard_id].write();
if let Some(session_prefix) = derived_session_storage_prefix(&entry.key) {
shard
.session_slots
.delete_hashed(&session_prefix, route.key_hash, &entry.key);
}
shard.map.set_hashed(
route.key_hash,
entry.key,
entry.value,
entry.expire_at_ms,
now_ms,
);
shard.enforce_memory_limit(now_ms);
}
}
}