shardmap 0.1.0

Sharded embedded in-memory map with optional cache, protocol, and server internals
Documentation
use super::*;
use crate::server::commands::{RawCommandContext, RawDirectCommand};

impl DirectProtocol {
    #[cfg(feature = "embedded")]
    #[inline(always)]
    #[allow(dead_code)]
    pub(in crate::server) fn try_resp_direct_dispatch<'a>(
        buf: &'a [u8],
    ) -> Option<(usize, RespDirectCommand<'a>)> {
        let (pos, command, args) = DirectProtocol::try_resp_command_parts(buf)?;
        DirectProtocol::parse_resp_direct_command(command, args).map(|command| (pos, command))
    }

    #[inline(always)]
    pub(in crate::server) fn try_resp_command_parts<'a>(
        buf: &'a [u8],
    ) -> Option<(usize, &'a [u8], RespDirectArgs<'a>)> {
        let (arg_count, mut pos) = DirectProtocol::read_resp_array_header(buf)?;
        match RespDirectArgCount::from_raw(arg_count) {
            RespDirectArgCount::Supported => {
                let command = DirectProtocol::read_resp_bulk_arg(buf, &mut pos)?;
                let mut args = RespDirectArgs::new();
                for _ in 1..arg_count {
                    args.push(DirectProtocol::read_resp_bulk_arg(buf, &mut pos)?);
                }
                Some((pos, command, args))
            }
            RespDirectArgCount::Unsupported => None,
        }
    }

    #[cfg(feature = "embedded")]
    pub(in crate::server) fn parse_resp_direct_command<'a>(
        command: &'a [u8],
        args: RespDirectArgs<'a>,
    ) -> Option<RespDirectCommand<'a>> {
        RawCommandDispatcher::find(command).map(|handler| RespDirectCommand { handler, args })
    }

    #[cfg(feature = "embedded")]
    pub(in crate::server) fn parse_redis_opcode_direct_command<'a>(
        kind: crate::protocol::FastCommandKind,
        args: RespDirectArgs<'a>,
    ) -> Option<RespDirectCommand<'a>> {
        RawCommandDispatcher::find_redis_opcode(kind)
            .map(|handler| RespDirectCommand { handler, args })
    }
}

enum RespDirectArgCount {
    Supported,
    Unsupported,
}

impl RespDirectArgCount {
    fn from_raw(arg_count: usize) -> Self {
        match arg_count {
            1..=64 => Self::Supported,
            _ => Self::Unsupported,
        }
    }
}

#[cfg(feature = "embedded")]
pub(crate) struct RespDirectCommand<'a> {
    handler: &'static dyn RawDirectCommand,
    args: RespDirectArgs<'a>,
}

#[cfg(feature = "embedded")]
impl RespDirectCommand<'_> {
    pub(super) fn mutates_value(&self) -> bool {
        self.handler.mutates_value()
    }

    pub(super) fn try_execute_owned_shard(
        &self,
        store: &EmbeddedStore,
        out: &mut BytesMut,
        owned_shard_id: usize,
    ) -> bool {
        self.handler
            .execute_owned_shard(store, self.args.as_slice(), out, owned_shard_id)
    }

    fn execute(
        self,
        store: &EmbeddedStore,
        out: &mut BytesMut,
        fast_write_queue: Option<&mut FastWriteQueue>,
        single_threaded: bool,
        resp_protocol: RespProtocolVersion,
        _started_at: Instant,
    ) {
        self.handler.execute(RawCommandContext {
            store,
            args: self.args,
            out,
            fast_write_queue,
            single_threaded,
            resp_protocol,
        });
    }

    fn execute_fast(
        self,
        store: &EmbeddedStore,
        out: &mut BytesMut,
        fast_write_queue: Option<&mut FastWriteQueue>,
        single_threaded: bool,
        _started_at: Instant,
    ) {
        self.handler.execute_fast(RawCommandContext {
            store,
            args: self.args,
            out,
            fast_write_queue,
            single_threaded,
            resp_protocol: RespProtocolVersion::Resp2,
        });
    }
}

impl DirectProtocol {
    #[cfg(feature = "embedded")]
    #[inline(always)]
    pub(in crate::server) fn shared_execute_resp_direct_cmd_into(
        store: &EmbeddedStore,
        command: RespDirectCommand<'_>,
        out: &mut BytesMut,
        fast_write_queue: Option<&mut FastWriteQueue>,
        single_threaded: bool,
        resp_protocol: RespProtocolVersion,
        started_at: Instant,
    ) {
        command.execute(
            store,
            out,
            fast_write_queue,
            single_threaded,
            resp_protocol,
            started_at,
        );
    }

    #[cfg(feature = "embedded")]
    #[inline(always)]
    pub(in crate::server) fn shared_execute_fast_direct_cmd_into(
        store: &EmbeddedStore,
        command: RespDirectCommand<'_>,
        out: &mut BytesMut,
        fast_write_queue: Option<&mut FastWriteQueue>,
        single_threaded: bool,
        started_at: Instant,
    ) {
        command.execute_fast(store, out, fast_write_queue, single_threaded, started_at);
    }
}