shardmap 0.3.0

Sharded embedded in-memory map with optional cache, protocol, and server internals
Documentation
use std::collections::BTreeSet;

use crate::commands::redis::{
    array_bulk, bulk, error, int, optional_string_value, wrong_arity, wrongtype,
};
use crate::protocol::Frame;
use crate::storage::{EmbeddedStore, RedisStringStore};

const HLL_PREFIX: &[u8] = b"FC:HLL:v1\0";

macro_rules! define_hll_command {
    ($type:ident, $static_name:ident, $name:literal, $mutates:expr) => {
        #[derive(Debug, Clone, Copy)]
        pub(crate) struct $type;

        pub(crate) static $static_name: $type = $type;

        impl crate::commands::CommandSpec for $type {
            const NAME: &'static str = $name;
            const MUTATES_VALUE: bool = $mutates;
        }
    };
}

define_hll_command!(PFAdd, PFADD_COMMAND, "PFADD", true);
define_hll_command!(PFCount, PFCOUNT_COMMAND, "PFCOUNT", false);
define_hll_command!(PFMerge, PFMERGE_COMMAND, "PFMERGE", true);
define_hll_command!(PFDebug, PFDEBUG_COMMAND, "PFDEBUG", false);
define_hll_command!(PFSelfTest, PFSELFTEST_COMMAND, "PFSELFTEST", false);

impl crate::commands::redis::RedisCommand for PFAdd {
    fn execute(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
        let [key, elements @ ..] = args else {
            return wrong_arity("PFADD");
        };
        let result = store.transform_string_value_no_ttl(
            key,
            |existing| {
                let mut members = decode_hll(existing)?;
                let before = members.len();
                for element in elements {
                    members.insert(element.to_vec());
                }
                Ok(((members.len() != before) as i64, encode_hll(&members)))
            },
            wrongtype,
        );
        match result {
            Ok(value) => int(value),
            Err(frame) => frame,
        }
    }
}

impl crate::commands::redis::RedisCommand for PFCount {
    fn execute(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
        if args.is_empty() {
            return wrong_arity("PFCOUNT");
        }
        let mut members = BTreeSet::new();
        for key in args {
            match optional_string_value(store, key, true) {
                Ok(Some(value)) => match decode_hll(Some(&value)) {
                    Ok(values) => members.extend(values),
                    Err(frame) => return frame,
                },
                Ok(None) => {}
                Err(frame) => return frame,
            }
        }
        int(members.len() as i64)
    }
}

impl crate::commands::redis::RedisCommand for PFMerge {
    fn execute(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
        let [dest, sources @ ..] = args else {
            return wrong_arity("PFMERGE");
        };
        let mut members = BTreeSet::new();
        for source in sources {
            match optional_string_value(store, source, true) {
                Ok(Some(value)) => match decode_hll(Some(&value)) {
                    Ok(values) => members.extend(values),
                    Err(frame) => return frame,
                },
                Ok(None) => {}
                Err(frame) => return frame,
            }
        }
        let encoded = encode_hll(&members);
        match store.transform_string_value_no_ttl(dest, |_| Ok(((), encoded)), wrongtype) {
            Ok(()) => crate::commands::redis::simple("OK"),
            Err(frame) => frame,
        }
    }
}

impl crate::commands::redis::RedisCommand for PFDebug {
    fn execute(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
        match args {
            [sub, key] if sub.eq_ignore_ascii_case(b"ENCODING") => {
                match optional_string_value(store, key, true) {
                    Ok(Some(value)) => match decode_hll(Some(&value)) {
                        Ok(_) => bulk(b"shardcache-exact".to_vec()),
                        Err(frame) => frame,
                    },
                    Ok(None) => bulk(b"empty".to_vec()),
                    Err(frame) => frame,
                }
            }
            [sub, key] if sub.eq_ignore_ascii_case(b"DECODE") => {
                match optional_string_value(store, key, true) {
                    Ok(Some(value)) => match decode_hll(Some(&value)) {
                        Ok(values) => array_bulk(
                            values
                                .into_iter()
                                .map(|member| {
                                    let mut line = b"member ".to_vec();
                                    line.extend_from_slice(&member);
                                    line
                                })
                                .collect(),
                        ),
                        Err(frame) => frame,
                    },
                    Ok(None) => Frame::Array(Vec::new()),
                    Err(frame) => frame,
                }
            }
            [sub, key] if sub.eq_ignore_ascii_case(b"GETREG") => {
                match optional_string_value(store, key, true) {
                    Ok(Some(value)) => match decode_hll(Some(&value)) {
                        Ok(values) => Frame::Array(
                            values
                                .into_iter()
                                .map(
                                    |member| int((xxhash_rust::xxh3::xxh3_64(&member) & 63) as i64),
                                )
                                .collect(),
                        ),
                        Err(frame) => frame,
                    },
                    Ok(None) => Frame::Array(Vec::new()),
                    Err(frame) => frame,
                }
            }
            _ => error("ERR unknown PFDEBUG subcommand or wrong number of arguments"),
        }
    }
}

impl crate::commands::redis::RedisCommand for PFSelfTest {
    fn execute(_store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
        match args {
            [] => crate::commands::redis::simple("OK"),
            _ => wrong_arity("PFSELFTEST"),
        }
    }
}

fn decode_hll(existing: Option<&[u8]>) -> Result<BTreeSet<Vec<u8>>, Frame> {
    let Some(value) = existing else {
        return Ok(BTreeSet::new());
    };
    if !value.starts_with(HLL_PREFIX) {
        return Err(error(
            "WRONGTYPE Key is not a valid HyperLogLog string value.",
        ));
    }
    let mut cursor = HLL_PREFIX.len();
    let count = read_u32(value, &mut cursor)? as usize;
    let mut members = BTreeSet::new();
    for _ in 0..count {
        let len = read_u32(value, &mut cursor)? as usize;
        let end = cursor
            .checked_add(len)
            .ok_or_else(|| error("WRONGTYPE Key is not a valid HyperLogLog string value."))?;
        if end > value.len() {
            return Err(error(
                "WRONGTYPE Key is not a valid HyperLogLog string value.",
            ));
        }
        members.insert(value[cursor..end].to_vec());
        cursor = end;
    }
    if cursor != value.len() {
        return Err(error(
            "WRONGTYPE Key is not a valid HyperLogLog string value.",
        ));
    }
    Ok(members)
}

fn encode_hll(members: &BTreeSet<Vec<u8>>) -> Vec<u8> {
    let mut out = Vec::with_capacity(HLL_PREFIX.len().saturating_add(members.len() * 12));
    out.extend_from_slice(HLL_PREFIX);
    out.extend_from_slice(&(members.len() as u32).to_le_bytes());
    for member in members {
        out.extend_from_slice(&(member.len() as u32).to_le_bytes());
        out.extend_from_slice(member);
    }
    out
}

fn read_u32(value: &[u8], cursor: &mut usize) -> Result<u32, Frame> {
    let end = cursor
        .checked_add(4)
        .ok_or_else(|| error("WRONGTYPE Key is not a valid HyperLogLog string value."))?;
    let bytes = value
        .get(*cursor..end)
        .ok_or_else(|| error("WRONGTYPE Key is not a valid HyperLogLog string value."))?;
    *cursor = end;
    Ok(u32::from_le_bytes(
        bytes.try_into().expect("slice length was checked"),
    ))
}