fast-cache 0.1.0

Embedded-first thread-per-core in-memory cache with optional Redis-compatible server
Documentation
//! SET command parsing and execution.
//!
//! SET-specific behavior starts here. Transport-specific implementations,
//! option parsing, and write-path selection live in `set/*.rs` submodules.

mod engine;
#[cfg(feature = "server")]
mod fcnp;
mod options;
#[cfg(feature = "server")]
mod server;
#[cfg(feature = "server")]
mod storage;

use crate::Result;
use crate::protocol::{FastCommand, Frame};
#[cfg(feature = "server")]
use crate::server::commands::{BorrowedCommandContext, DirectCommandContext};
#[cfg(feature = "server")]
use crate::storage::EmbeddedStore;
use crate::storage::{Command, EngineCommandContext, EngineFrameFuture, RESP_SPANNED_VALUE_MIN};

use super::DecodedFastCommand;
use super::parsing::CommandArity;
use options::StorageSetOptions;

/// Command spec registered in the command catalogs.
pub(crate) struct Set;
pub(crate) static COMMAND: Set = Set;

/// Owned SET payload used when a RESP frame has already been copied out.
#[derive(Debug, Clone)]
pub(crate) struct OwnedSet {
    key: Vec<u8>,
    value: Vec<u8>,
    ttl_ms: Option<u64>,
}

impl OwnedSet {
    fn new(key: Vec<u8>, value: Vec<u8>, ttl_ms: Option<u64>) -> Self {
        Self { key, value, ttl_ms }
    }
}

impl super::OwnedCommandData for OwnedSet {
    type Spec = Set;

    fn route_key(&self) -> Option<&[u8]> {
        Some(&self.key)
    }

    fn to_borrowed_command(&self) -> super::BorrowedCommandBox<'_> {
        Box::new(BorrowedSet::new(&self.key, &self.value, self.ttl_ms))
    }
}

/// Borrowed SET payload used on zero-copy request paths.
#[derive(Debug, Clone, Copy)]
pub(crate) struct BorrowedSet<'a> {
    key: &'a [u8],
    value: &'a [u8],
    ttl_ms: Option<u64>,
}

impl<'a> BorrowedSet<'a> {
    fn new(key: &'a [u8], value: &'a [u8], ttl_ms: Option<u64>) -> Self {
        Self { key, value, ttl_ms }
    }
}

impl<'a> super::BorrowedCommandData<'a> for BorrowedSet<'a> {
    type Spec = Set;

    fn route_key(&self) -> Option<&'a [u8]> {
        Some(self.key)
    }

    fn supports_spanned_resp(&self) -> bool {
        self.value.len() >= RESP_SPANNED_VALUE_MIN
    }

    fn to_owned_command(&self) -> Command {
        Command::new(Box::new(OwnedSet::new(
            self.key.to_vec(),
            self.value.to_vec(),
            self.ttl_ms,
        )))
    }

    fn execute_engine<'b>(&'b self, ctx: EngineCommandContext<'b>) -> EngineFrameFuture<'b>
    where
        'a: 'b,
    {
        let key = self.key;
        let value = self.value;
        let ttl_ms = self.ttl_ms;
        Box::pin(async move { Set::execute_engine_frame(ctx, key, value, ttl_ms).await })
    }

    #[cfg(feature = "server")]
    fn execute_borrowed_frame(&self, store: &EmbeddedStore, _now_ms: u64) -> Frame {
        store.set(self.key.to_vec(), self.value.to_vec(), self.ttl_ms);
        Frame::SimpleString("OK".into())
    }

    #[cfg(feature = "server")]
    fn execute_borrowed(&self, ctx: BorrowedCommandContext<'_, '_, '_>) {
        match ctx.single_threaded && !ctx.store.has_redis_objects() {
            true => {
                // SAFETY: forwarded from caller's single-worker contract.
                unsafe {
                    ctx.store
                        .set_single_threaded(self.key, self.value, self.ttl_ms)
                };
            }
            false => {
                ctx.store
                    .set(self.key.to_vec(), self.value.to_vec(), self.ttl_ms);
            }
        }
        ctx.out.extend_from_slice(b"+OK\r\n");
    }

    #[cfg(feature = "server")]
    fn execute_direct_borrowed(&self, ctx: DirectCommandContext) -> Frame {
        ctx.set_owned(self.key.to_vec(), self.value.to_vec(), self.ttl_ms);
        Frame::SimpleString("OK".into())
    }
}

impl super::CommandSpec for Set {
    const NAME: &'static str = "SET";
    const MUTATES_VALUE: bool = true;
}

impl super::OwnedCommandParse for Set {
    fn parse_owned(parts: &[Vec<u8>]) -> Result<Command> {
        CommandArity::<Self>::at_least(parts.len(), 3, "key and value")?;
        let options = StorageSetOptions::parse(parts[3..].iter().map(Vec::as_slice))?;
        Ok(Command::new(Box::new(OwnedSet::new(
            parts[1].clone(),
            parts[2].clone(),
            options.ttl_ms,
        ))))
    }
}

impl<'a> super::BorrowedCommandParse<'a> for Set {
    fn parse_borrowed(parts: &[&'a [u8]]) -> Result<super::BorrowedCommandBox<'a>> {
        CommandArity::<Self>::at_least(parts.len(), 3, "key and value")?;
        let options = StorageSetOptions::parse(parts[3..].iter().copied())?;
        Ok(Box::new(BorrowedSet::new(
            parts[1],
            parts[2],
            options.ttl_ms,
        )))
    }
}

impl DecodedFastCommand for Set {
    fn matches_decoded_fast(&self, command: &FastCommand<'_>) -> bool {
        matches!(command, FastCommand::Set { .. })
    }
}