tinyredis 1.0.0

A Redis-compatible server written in Rust. Uses RESP2, persists writes to an append-only file, and accepts connections from any standard Redis client.
Documentation
use std::sync::Arc;

use bytes::Bytes;
use tokio::sync::Mutex;

use crate::commands::utils::{aof_log, parse_i64, to_str};
use crate::error::{Error, Result};
use crate::parser::{Command, Frame};
use crate::persistence::AofSender;
use crate::store::{ListDirection, Store};

type SharedStore = Arc<Mutex<Store>>;

pub async fn lpush(cmd: &Command, store: &SharedStore, aof: &Option<AofSender>) -> Result<Frame> {
    if cmd.args.len() < 2 {
        return Err(Error::WrongArity("LPUSH"));
    }
    let key = to_str(cmd.arg(0, "LPUSH")?, "LPUSH")?;
    let values: Vec<Bytes> = cmd.args[1..].to_vec();

    let len = store.lock().await.lpush(&key, &values)?;

    aof_log(aof, build_frame("LPUSH", &key, &values)).await;
    Ok(Frame::Integer(len as i64))
}

pub async fn rpush(cmd: &Command, store: &SharedStore, aof: &Option<AofSender>) -> Result<Frame> {
    if cmd.args.len() < 2 {
        return Err(Error::WrongArity("RPUSH"));
    }
    let key = to_str(cmd.arg(0, "RPUSH")?, "RPUSH")?;
    let values: Vec<Bytes> = cmd.args[1..].to_vec();

    let len = store.lock().await.rpush(&key, &values)?;

    aof_log(aof, build_frame("RPUSH", &key, &values)).await;
    Ok(Frame::Integer(len as i64))
}

pub async fn lpop(cmd: &Command, store: &SharedStore, aof: &Option<AofSender>) -> Result<Frame> {
    if cmd.args.is_empty() {
        return Err(Error::WrongArity("LPOP"));
    }
    let key = to_str(cmd.arg(0, "LPOP")?, "LPOP")?;
    let with_count = cmd.args.len() > 1;
    let count = if with_count {
        parse_i64(cmd.arg(1, "LPOP")?)
            .map_err(|_| Error::Protocol("value is not an integer or out of range".into()))
            .and_then(|n| {
                if n < 0 {
                    Err(Error::Protocol(
                        "value is not an integer or out of range".into(),
                    ))
                } else {
                    Ok(n as usize)
                }
            })?
    } else {
        1
    };

    let result = store.lock().await.lpop(&key, count)?;

    match result {
        None => Ok(Frame::Null),
        Some(mut items) => {
            aof_log(
                aof,
                Frame::Array(vec![
                    Frame::Bulk(Bytes::from_static(b"LPOP")),
                    Frame::Bulk(Bytes::from(key)),
                    Frame::Integer(items.len() as i64),
                ]),
            )
            .await;
            if with_count {
                Ok(Frame::Array(items.into_iter().map(Frame::Bulk).collect()))
            } else {
                Ok(Frame::Bulk(items.remove(0)))
            }
        }
    }
}

pub async fn rpop(cmd: &Command, store: &SharedStore, aof: &Option<AofSender>) -> Result<Frame> {
    if cmd.args.is_empty() {
        return Err(Error::WrongArity("RPOP"));
    }
    let key = to_str(cmd.arg(0, "RPOP")?, "RPOP")?;
    let with_count = cmd.args.len() > 1;
    let count = if with_count {
        parse_i64(cmd.arg(1, "RPOP")?)
            .map_err(|_| Error::Protocol("value is not an integer or out of range".into()))
            .and_then(|n| {
                if n < 0 {
                    Err(Error::Protocol(
                        "value is not an integer or out of range".into(),
                    ))
                } else {
                    Ok(n as usize)
                }
            })?
    } else {
        1
    };

    let result = store.lock().await.rpop(&key, count)?;

    match result {
        None => Ok(Frame::Null),
        Some(mut items) => {
            aof_log(
                aof,
                Frame::Array(vec![
                    Frame::Bulk(Bytes::from_static(b"RPOP")),
                    Frame::Bulk(Bytes::from(key)),
                    Frame::Integer(items.len() as i64),
                ]),
            )
            .await;
            if with_count {
                Ok(Frame::Array(items.into_iter().map(Frame::Bulk).collect()))
            } else {
                Ok(Frame::Bulk(items.remove(0)))
            }
        }
    }
}

pub async fn llen(cmd: &Command, store: &SharedStore) -> Result<Frame> {
    let key = to_str(cmd.arg(0, "LLEN")?, "LLEN")?;
    let len = store.lock().await.llen(&key)?;
    Ok(Frame::Integer(len as i64))
}

pub async fn lrange(cmd: &Command, store: &SharedStore) -> Result<Frame> {
    if cmd.args.len() < 3 {
        return Err(Error::WrongArity("LRANGE"));
    }
    let key = to_str(cmd.arg(0, "LRANGE")?, "LRANGE")?;
    let start = parse_i64(cmd.arg(1, "LRANGE")?)?;
    let stop = parse_i64(cmd.arg(2, "LRANGE")?)?;

    let items = store.lock().await.lrange(&key, start, stop)?;
    Ok(Frame::Array(items.into_iter().map(Frame::Bulk).collect()))
}

pub async fn lindex(cmd: &Command, store: &SharedStore) -> Result<Frame> {
    if cmd.args.len() < 2 {
        return Err(Error::WrongArity("LINDEX"));
    }
    let key = to_str(cmd.arg(0, "LINDEX")?, "LINDEX")?;
    let index = parse_i64(cmd.arg(1, "LINDEX")?)?;

    match store.lock().await.lindex(&key, index)? {
        None => Ok(Frame::Null),
        Some(v) => Ok(Frame::Bulk(v)),
    }
}

pub async fn lset(cmd: &Command, store: &SharedStore, aof: &Option<AofSender>) -> Result<Frame> {
    if cmd.args.len() < 3 {
        return Err(Error::WrongArity("LSET"));
    }
    let key = to_str(cmd.arg(0, "LSET")?, "LSET")?;
    let index = parse_i64(cmd.arg(1, "LSET")?)?;
    let value = cmd.arg(2, "LSET")?.clone();

    store.lock().await.lset(&key, index, value.clone())?;

    aof_log(
        aof,
        Frame::Array(vec![
            Frame::Bulk(Bytes::from_static(b"LSET")),
            Frame::Bulk(Bytes::from(key)),
            Frame::Integer(index),
            Frame::Bulk(value),
        ]),
    )
    .await;
    Ok(Frame::Simple("OK".into()))
}

pub async fn linsert(cmd: &Command, store: &SharedStore, aof: &Option<AofSender>) -> Result<Frame> {
    if cmd.args.len() < 4 {
        return Err(Error::WrongArity("LINSERT"));
    }
    let key = to_str(cmd.arg(0, "LINSERT")?, "LINSERT")?;
    let where_str = to_str(cmd.arg(1, "LINSERT")?, "LINSERT")?.to_uppercase();
    let before = match where_str.as_str() {
        "BEFORE" => true,
        "AFTER" => false,
        _ => return Err(Error::Protocol("syntax error".into())),
    };
    let pivot = cmd.arg(2, "LINSERT")?.clone();
    let element = cmd.arg(3, "LINSERT")?.clone();

    let result = store
        .lock()
        .await
        .linsert(&key, before, &pivot, element.clone())?;

    if result > 0 {
        aof_log(
            aof,
            Frame::Array(vec![
                Frame::Bulk(Bytes::from_static(b"LINSERT")),
                Frame::Bulk(Bytes::from(key)),
                Frame::Bulk(Bytes::from(where_str)),
                Frame::Bulk(pivot),
                Frame::Bulk(element),
            ]),
        )
        .await;
    }
    Ok(Frame::Integer(result))
}

pub async fn lrem(cmd: &Command, store: &SharedStore, aof: &Option<AofSender>) -> Result<Frame> {
    if cmd.args.len() < 3 {
        return Err(Error::WrongArity("LREM"));
    }
    let key = to_str(cmd.arg(0, "LREM")?, "LREM")?;
    let count = parse_i64(cmd.arg(1, "LREM")?)?;
    let element = cmd.arg(2, "LREM")?.clone();

    let removed = store.lock().await.lrem(&key, count, &element)?;

    if removed > 0 {
        aof_log(
            aof,
            Frame::Array(vec![
                Frame::Bulk(Bytes::from_static(b"LREM")),
                Frame::Bulk(Bytes::from(key)),
                Frame::Integer(count),
                Frame::Bulk(element),
            ]),
        )
        .await;
    }
    Ok(Frame::Integer(removed as i64))
}

pub async fn ltrim(cmd: &Command, store: &SharedStore, aof: &Option<AofSender>) -> Result<Frame> {
    if cmd.args.len() < 3 {
        return Err(Error::WrongArity("LTRIM"));
    }
    let key = to_str(cmd.arg(0, "LTRIM")?, "LTRIM")?;
    let start = parse_i64(cmd.arg(1, "LTRIM")?)?;
    let stop = parse_i64(cmd.arg(2, "LTRIM")?)?;

    store.lock().await.ltrim(&key, start, stop)?;

    aof_log(
        aof,
        Frame::Array(vec![
            Frame::Bulk(Bytes::from_static(b"LTRIM")),
            Frame::Bulk(Bytes::from(key)),
            Frame::Integer(start),
            Frame::Integer(stop),
        ]),
    )
    .await;
    Ok(Frame::Simple("OK".into()))
}

pub async fn lmove(cmd: &Command, store: &SharedStore, aof: &Option<AofSender>) -> Result<Frame> {
    if cmd.args.len() < 4 {
        return Err(Error::WrongArity("LMOVE"));
    }
    let src = to_str(cmd.arg(0, "LMOVE")?, "LMOVE")?;
    let dst = to_str(cmd.arg(1, "LMOVE")?, "LMOVE")?;
    let wherefrom = parse_direction(cmd.arg(2, "LMOVE")?)?;
    let whereto = parse_direction(cmd.arg(3, "LMOVE")?)?;

    let result = store.lock().await.lmove(&src, &dst, wherefrom, whereto)?;

    match result {
        None => Ok(Frame::Null),
        Some(element) => {
            aof_log(
                aof,
                Frame::Array(vec![
                    Frame::Bulk(Bytes::from_static(b"LMOVE")),
                    Frame::Bulk(Bytes::from(src)),
                    Frame::Bulk(Bytes::from(dst)),
                    Frame::Bulk(Bytes::from(direction_str(wherefrom))),
                    Frame::Bulk(Bytes::from(direction_str(whereto))),
                ]),
            )
            .await;
            Ok(Frame::Bulk(element))
        }
    }
}

fn parse_direction(b: &Bytes) -> Result<ListDirection> {
    match b.to_ascii_uppercase().as_slice() {
        b"LEFT" => Ok(ListDirection::Left),
        b"RIGHT" => Ok(ListDirection::Right),
        _ => Err(Error::Protocol("syntax error".into())),
    }
}

fn direction_str(d: ListDirection) -> &'static str {
    match d {
        ListDirection::Left => "LEFT",
        ListDirection::Right => "RIGHT",
    }
}

fn build_frame(cmd_name: &'static str, key: &str, values: &[Bytes]) -> Frame {
    let mut parts = vec![
        Frame::Bulk(Bytes::from_static(cmd_name.as_bytes())),
        Frame::Bulk(Bytes::from(key.to_string())),
    ];
    parts.extend(values.iter().cloned().map(Frame::Bulk));
    Frame::Array(parts)
}