shardmap 0.3.2

Sharded embedded in-memory map with optional cache, protocol, and server internals
Documentation
use super::super::*;

#[allow(dead_code)]
pub(crate) trait RedisKeyStore {
    fn clone_object_value(&self, key: &[u8]) -> Option<RedisObjectValue>;
    fn clone_pinned_vector_value(&self, key: &[u8]) -> Option<bytes::Bytes>;
    fn set_object_value(&self, key: &[u8], value: RedisObjectValue, ttl_ms: Option<u64>);
    fn set_pinned_vector_value(&self, key: &[u8], value: bytes::Bytes, ttl_ms: Option<u64>);
    fn rename_key(
        &self,
        source: &[u8],
        dest: &[u8],
        nx: bool,
    ) -> std::result::Result<bool, RedisObjectError>;
}

impl RedisKeyStore for EmbeddedStore {
    fn clone_object_value(&self, key: &[u8]) -> Option<RedisObjectValue> {
        let route = self.route_key(key);
        let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
        if bucket.has_expirations() {
            let now_ms = now_millis();
            if bucket.object_is_expired(key, now_ms) {
                drop(bucket);
                let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
                if bucket.delete_expired(key, now_ms) {
                    self.objects.note_deleted(route.shard_id);
                }
                return None;
            }
        }
        bucket.clone_value(key, now_millis())
    }

    fn clone_pinned_vector_value(&self, key: &[u8]) -> Option<bytes::Bytes> {
        let primary_route = self.route_key(key);
        let vector_route = self.route_vector_key(key);
        if primary_route.shard_id == vector_route.shard_id {
            return None;
        }
        let mut value = None;
        self.with_shared_value_bytes_routed(vector_route, key, &mut |bytes| {
            if bytes.starts_with(crate::storage::VECTOR_SET_PREFIX) {
                value = Some(bytes.clone());
            }
        });
        value
    }

    fn set_object_value(&self, key: &[u8], value: RedisObjectValue, ttl_ms: Option<u64>) {
        let Some(ttl_ms) = ttl_ms else {
            set_object_value_expire_at(self, key, value, None, 0);
            return;
        };
        let now_ms = now_millis();
        set_object_value_expire_at(
            self,
            key,
            value,
            Some(now_ms.saturating_add(ttl_ms)),
            now_ms,
        );
    }

    fn set_pinned_vector_value(&self, key: &[u8], value: bytes::Bytes, ttl_ms: Option<u64>) {
        let now_ms = now_millis();
        let primary_route = self.route_key(key);
        let vector_route = self.route_vector_key(key);
        if primary_route.shard_id != vector_route.shard_id {
            self.delete_routed_then(primary_route, key, now_ms, || {});
        }
        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
        self.set_value_bytes_routed_expire_at(vector_route, key, value, expire_at_ms, now_ms);
    }

    fn rename_key(
        &self,
        source: &[u8],
        dest: &[u8],
        nx: bool,
    ) -> std::result::Result<bool, RedisObjectError> {
        if source == dest {
            if !self.exists(source) {
                return Err(RedisObjectError::MissingKey);
            }
            return Ok(!nx);
        }
        if nx && self.exists(dest) {
            return Ok(false);
        }
        let ttl_ms = match self.pttl_millis(source) {
            ttl if ttl >= 0 => Some(ttl as u64),
            _ => None,
        };
        if let Some(value) = self.get_value_bytes(source) {
            self.set_value_bytes(dest, value, ttl_ms);
            self.delete(source);
            return Ok(true);
        }
        if let Some(value) = self.clone_pinned_vector_value(source) {
            self.delete(dest);
            self.set_pinned_vector_value(dest, value, ttl_ms);
            self.delete(source);
            return Ok(true);
        }
        if let Some(value) = self.clone_object_value(source) {
            self.delete(dest);
            self.set_object_value(dest, value, ttl_ms);
            self.delete(source);
            return Ok(true);
        }
        Err(RedisObjectError::MissingKey)
    }
}

fn set_object_value_expire_at(
    store: &EmbeddedStore,
    key: &[u8],
    value: RedisObjectValue,
    expire_at_ms: Option<u64>,
    now_ms: u64,
) {
    let should_notify = match &value {
        RedisObjectValue::List(values) => !values.is_empty(),
        RedisObjectValue::ZSet(values) => !values.is_empty(),
        _ => false,
    };
    let route = store.route_key(key);
    store.delete_pinned_vector_value_if_distinct(route, key, now_ms);
    let mut bucket = store.objects.write_bucket(route.shard_id, route.key_hash);
    let mut shard = store.shards[route.shard_id].write();
    let had_object = bucket.contains_object(key);
    if let Some(session_prefix) = point_write_session_storage_prefix(key) {
        shard
            .session_slots
            .delete_hashed(&session_prefix, route.key_hash, key);
    }
    shard.map.delete_hashed(route.key_hash, key, now_ms);
    let created = bucket.insert_value(key.to_vec(), value);
    if created && !had_object {
        store.objects.note_created(route.shard_id);
    }
    if let Some(expire_at_ms) = expire_at_ms {
        bucket.expire(key, expire_at_ms, now_ms);
    }
    drop(shard);
    drop(bucket);
    if should_notify {
        store.notify_redis_object_shard(route.shard_id);
    }
}