redis-on-mysql 0.0.1

A Redis-compatible proxy that stores all data and Pub/Sub state in MySQL
Documentation
use bytes::Bytes;
use resp_async::response::RespError;
use resp_async::{Cmd, LocalAddr, Value};

use crate::handlers::util::{arg_as_bytes, crc16, ok, wrong_arity};
use crate::state::SessionHandle;

pub async fn cluster(
    Cmd(cmd): Cmd,
    LocalAddr(local_addr): LocalAddr,
    SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
    if cmd.args.is_empty() {
        return Err(wrong_arity("CLUSTER"));
    }
    let _auth = session.auth().await.ok_or(RespError::NoAuth)?;
    let mut sub = arg_as_bytes(&cmd.args[0])?.to_vec();
    for b in &mut sub {
        b.make_ascii_uppercase();
    }
    match sub.as_slice() {
        b"INFO" => Ok(Value::Bulk(Bytes::from(cluster_info()))),
        b"NODES" => Ok(Value::Bulk(Bytes::from(cluster_nodes(&local_addr)))),
        b"SLOTS" => Ok(cluster_slots(&local_addr)),
        b"KEYSLOT" => {
            if cmd.args.len() != 2 {
                return Err(wrong_arity("CLUSTER"));
            }
            let key = arg_as_bytes(&cmd.args[1])?;
            let slot = key_slot(key.as_ref());
            Ok(Value::Integer(slot as i64))
        }
        _ => Err(RespError::invalid_data(
            "ERR This instance has cluster support disabled",
        )),
    }
}

pub async fn readonly(SessionHandle(session): SessionHandle) -> Result<Value, RespError> {
    let _auth = session.auth().await.ok_or(RespError::NoAuth)?;
    Ok(ok())
}

pub async fn readwrite(SessionHandle(session): SessionHandle) -> Result<Value, RespError> {
    let _auth = session.auth().await.ok_or(RespError::NoAuth)?;
    Ok(ok())
}

pub async fn asking(SessionHandle(session): SessionHandle) -> Result<Value, RespError> {
    let _auth = session.auth().await.ok_or(RespError::NoAuth)?;
    Ok(ok())
}

fn cluster_info() -> Vec<u8> {
    let mut info = String::new();
    info.push_str("cluster_state:ok\r\n");
    info.push_str("cluster_slots_assigned:16384\r\n");
    info.push_str("cluster_slots_ok:16384\r\n");
    info.push_str("cluster_slots_pfail:0\r\n");
    info.push_str("cluster_slots_fail:0\r\n");
    info.push_str("cluster_known_nodes:1\r\n");
    info.push_str("cluster_size:1\r\n");
    info.push_str("cluster_current_epoch:0\r\n");
    info.push_str("cluster_my_epoch:0\r\n");
    info.push_str("cluster_stats_messages_sent:0\r\n");
    info.push_str("cluster_stats_messages_received:0\r\n");
    info.into_bytes()
}

fn cluster_nodes(addr: &std::net::SocketAddr) -> Vec<u8> {
    let node_id = node_id_for(addr);
    let line = format!(
        "{id} {addr} myself,master - 0 0 0 connected 0-16383\n",
        id = node_id,
        addr = addr
    );
    line.into_bytes()
}

fn cluster_slots(addr: &std::net::SocketAddr) -> Value {
    let host = Bytes::from(addr.ip().to_string());
    let node_id = Bytes::from(node_id_for(addr));
    Value::Array(vec![Value::Array(vec![
        Value::Integer(0),
        Value::Integer(16383),
        Value::Array(vec![
            Value::Bulk(host),
            Value::Integer(addr.port() as i64),
            Value::Bulk(node_id),
        ]),
    ])])
}

fn node_id_for(addr: &std::net::SocketAddr) -> String {
    let base = format!("{:04x}", crc16(addr.to_string().as_bytes()));
    let mut out = String::with_capacity(40);
    for _ in 0..10 {
        out.push_str(&base);
    }
    out
}

fn key_slot(key: &[u8]) -> u16 {
    let key = match hash_tag(key) {
        Some(tag) => tag,
        None => key,
    };
    crc16(key) % 16384
}

fn hash_tag(key: &[u8]) -> Option<&[u8]> {
    let start = key.iter().position(|&b| b == b'{')?;
    let end = key[start + 1..].iter().position(|&b| b == b'}')?;
    if end == 0 {
        return None;
    }
    Some(&key[start + 1..start + 1 + end])
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn keyslot_hash_tag() {
        let slot = key_slot(b"foo{bar}baz");
        let slot2 = key_slot(b"bar");
        assert_eq!(slot, slot2);
    }
}