shardmap 0.2.1

Sharded embedded in-memory map with optional cache, protocol, and server internals
Documentation
use super::{EnabledModuleInfo, RedisModuleCommand, module_command_error};
use crate::commands::redis::{
    bulk, eq_ignore_ascii_case, error, int, parse_f64, parse_i64, parse_usize, simple, wrong_arity,
};
use crate::protocol::Frame;
use crate::storage::EmbeddedStore;

pub(super) const MODULES: &[EnabledModuleInfo] = &[EnabledModuleInfo {
    name: "topk",
    version: 1,
}];

pub(super) const COMMANDS: &[RedisModuleCommand] = redis_module_commands![
    "topk";
    "TOPK.ADD" => true,
    "TOPK.COUNT" => false,
    "TOPK.INCRBY" => true,
    "TOPK.INFO" => false,
    "TOPK.LIST" => false,
    "TOPK.QUERY" => false,
    "TOPK.RESERVE" => true,
];

#[cfg(feature = "redis-module-topk")]
pub(super) fn execute(name: &str, store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
    match name {
        name if eq_ignore_ascii_case(name.as_bytes(), b"TOPK.RESERVE") => topk_reserve(store, args),
        name if eq_ignore_ascii_case(name.as_bytes(), b"TOPK.ADD") => topk_add(store, args),
        name if eq_ignore_ascii_case(name.as_bytes(), b"TOPK.INCRBY") => topk_incrby(store, args),
        name if eq_ignore_ascii_case(name.as_bytes(), b"TOPK.QUERY") => topk_query(store, args),
        name if eq_ignore_ascii_case(name.as_bytes(), b"TOPK.COUNT") => topk_count(store, args),
        name if eq_ignore_ascii_case(name.as_bytes(), b"TOPK.LIST") => topk_list(store, args),
        name if eq_ignore_ascii_case(name.as_bytes(), b"TOPK.INFO") => topk_info(store, args),
        _ => module_command_error("topk"),
    }
}

#[cfg(feature = "redis-module-topk")]
fn topk_reserve(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
    let ([key, k] | [key, k, _, _, _]) = args else {
        return wrong_arity("TOPK.RESERVE");
    };
    let k = match parse_usize(k) {
        Ok(k) => k,
        Err(()) => return error("ERR invalid topk value"),
    };
    let (width, depth, decay) = match args {
        [_, _] => (8, 7, 0.9),
        [_, _, width, depth, decay] => {
            let width = match parse_usize(width) {
                Ok(width) => width,
                Err(()) => return error("ERR invalid width value"),
            };
            let depth = match parse_usize(depth) {
                Ok(depth) => depth,
                Err(()) => return error("ERR invalid depth value"),
            };
            let decay = match parse_f64(decay) {
                Ok(decay) => decay,
                Err(()) => return error("ERR invalid decay value"),
            };
            (width, depth, decay)
        }
        _ => unreachable!(),
    };
    match store.topk_reserve(key, k, width, depth, decay) {
        Ok(()) => simple("OK"),
        Err(crate::storage::TopKError::AlreadyExists) => error("ERR item exists"),
        Err(crate::storage::TopKError::WrongType) => error(crate::storage::WRONGTYPE_MESSAGE),
        Err(crate::storage::TopKError::InvalidArgument) => {
            error("ERR invalid TOPK.RESERVE argument")
        }
        Err(crate::storage::TopKError::MissingKey) => error("ERR no such key"),
    }
}

#[cfg(feature = "redis-module-topk")]
fn topk_add(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
    let Some((key, items)) = args.split_first() else {
        return wrong_arity("TOPK.ADD");
    };
    if items.is_empty() {
        return wrong_arity("TOPK.ADD");
    }
    topk_dropped_array(store.topk_add(key, items))
}

#[cfg(feature = "redis-module-topk")]
fn topk_incrby(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
    let Some((key, rest)) = args.split_first() else {
        return wrong_arity("TOPK.INCRBY");
    };
    if rest.is_empty() || rest.len() % 2 != 0 {
        return wrong_arity("TOPK.INCRBY");
    }
    let mut updates = Vec::with_capacity(rest.len() / 2);
    for pair in rest.chunks_exact(2) {
        let increment = match parse_i64(pair[1]) {
            Ok(increment) => increment,
            Err(()) => return error("ERR invalid increment"),
        };
        updates.push((pair[0].to_vec(), increment));
    }
    topk_dropped_array(store.topk_incrby(key, &updates))
}

#[cfg(feature = "redis-module-topk")]
fn topk_query(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
    let Some((key, items)) = args.split_first() else {
        return wrong_arity("TOPK.QUERY");
    };
    if items.is_empty() {
        return wrong_arity("TOPK.QUERY");
    }
    match store.topk_query(key, items) {
        Ok(values) => Frame::Array(
            values
                .into_iter()
                .map(|value| int(if value { 1 } else { 0 }))
                .collect(),
        ),
        Err(err) => topk_error_frame(err),
    }
}

#[cfg(feature = "redis-module-topk")]
fn topk_count(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
    let Some((key, items)) = args.split_first() else {
        return wrong_arity("TOPK.COUNT");
    };
    if items.is_empty() {
        return wrong_arity("TOPK.COUNT");
    }
    match store.topk_counts(key, items) {
        Ok(values) => Frame::Array(values.into_iter().map(int).collect()),
        Err(err) => topk_error_frame(err),
    }
}

#[cfg(feature = "redis-module-topk")]
fn topk_list(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
    let ([key] | [key, _]) = args else {
        return wrong_arity("TOPK.LIST");
    };
    let with_count = match args {
        [_] => false,
        [_, option] if eq_ignore_ascii_case(option, b"WITHCOUNT") => true,
        [_, _] => return error("ERR syntax error"),
        _ => unreachable!(),
    };
    match store.topk_list(key) {
        Ok(entries) => {
            let mut frames = Vec::with_capacity(if with_count {
                entries.len() * 2
            } else {
                entries.len()
            });
            for (item, count) in entries {
                frames.push(bulk(item));
                if with_count {
                    frames.push(int(count));
                }
            }
            Frame::Array(frames)
        }
        Err(err) => topk_error_frame(err),
    }
}

#[cfg(feature = "redis-module-topk")]
fn topk_info(store: &EmbeddedStore, args: &[&[u8]]) -> Frame {
    let [key] = args else {
        return wrong_arity("TOPK.INFO");
    };
    match store.topk_info(key) {
        Ok(info) => Frame::Array(vec![
            simple("k"),
            int(info.k as i64),
            simple("width"),
            int(info.width as i64),
            simple("depth"),
            int(info.depth as i64),
            simple("decay"),
            simple(&format!("{:.17}", info.decay)),
        ]),
        Err(err) => topk_error_frame(err),
    }
}

#[cfg(feature = "redis-module-topk")]
fn topk_dropped_array(
    result: std::result::Result<Vec<Option<Vec<u8>>>, crate::storage::TopKError>,
) -> Frame {
    match result {
        Ok(values) => Frame::Array(
            values
                .into_iter()
                .map(|value| value.map(bulk).unwrap_or(Frame::Null))
                .collect(),
        ),
        Err(err) => topk_error_frame(err),
    }
}

#[cfg(feature = "redis-module-topk")]
fn topk_error_frame(err: crate::storage::TopKError) -> Frame {
    match err {
        crate::storage::TopKError::AlreadyExists => error("ERR item exists"),
        crate::storage::TopKError::MissingKey => error("ERR no such key"),
        crate::storage::TopKError::WrongType => error(crate::storage::WRONGTYPE_MESSAGE),
        crate::storage::TopKError::InvalidArgument => error("ERR invalid TopK argument"),
    }
}