use bytes::Bytes;
use resp_async::response::RespError;
use resp_async::{Cmd, LocalAddr, Value};
use crate::handlers::util::{arg_as_bytes, crc16, ok, wrong_arity};
use crate::state::SessionHandle;
pub async fn cluster(
Cmd(cmd): Cmd,
LocalAddr(local_addr): LocalAddr,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.is_empty() {
return Err(wrong_arity("CLUSTER"));
}
let _auth = session.auth().await.ok_or(RespError::NoAuth)?;
let mut sub = arg_as_bytes(&cmd.args[0])?.to_vec();
for b in &mut sub {
b.make_ascii_uppercase();
}
match sub.as_slice() {
b"INFO" => Ok(Value::Bulk(Bytes::from(cluster_info()))),
b"NODES" => Ok(Value::Bulk(Bytes::from(cluster_nodes(&local_addr)))),
b"SLOTS" => Ok(cluster_slots(&local_addr)),
b"KEYSLOT" => {
if cmd.args.len() != 2 {
return Err(wrong_arity("CLUSTER"));
}
let key = arg_as_bytes(&cmd.args[1])?;
let slot = key_slot(key.as_ref());
Ok(Value::Integer(slot as i64))
}
_ => Err(RespError::invalid_data(
"ERR This instance has cluster support disabled",
)),
}
}
pub async fn readonly(SessionHandle(session): SessionHandle) -> Result<Value, RespError> {
let _auth = session.auth().await.ok_or(RespError::NoAuth)?;
Ok(ok())
}
pub async fn readwrite(SessionHandle(session): SessionHandle) -> Result<Value, RespError> {
let _auth = session.auth().await.ok_or(RespError::NoAuth)?;
Ok(ok())
}
pub async fn asking(SessionHandle(session): SessionHandle) -> Result<Value, RespError> {
let _auth = session.auth().await.ok_or(RespError::NoAuth)?;
Ok(ok())
}
fn cluster_info() -> Vec<u8> {
let mut info = String::new();
info.push_str("cluster_state:ok\r\n");
info.push_str("cluster_slots_assigned:16384\r\n");
info.push_str("cluster_slots_ok:16384\r\n");
info.push_str("cluster_slots_pfail:0\r\n");
info.push_str("cluster_slots_fail:0\r\n");
info.push_str("cluster_known_nodes:1\r\n");
info.push_str("cluster_size:1\r\n");
info.push_str("cluster_current_epoch:0\r\n");
info.push_str("cluster_my_epoch:0\r\n");
info.push_str("cluster_stats_messages_sent:0\r\n");
info.push_str("cluster_stats_messages_received:0\r\n");
info.into_bytes()
}
fn cluster_nodes(addr: &std::net::SocketAddr) -> Vec<u8> {
let node_id = node_id_for(addr);
let line = format!(
"{id} {addr} myself,master - 0 0 0 connected 0-16383\n",
id = node_id,
addr = addr
);
line.into_bytes()
}
fn cluster_slots(addr: &std::net::SocketAddr) -> Value {
let host = Bytes::from(addr.ip().to_string());
let node_id = Bytes::from(node_id_for(addr));
Value::Array(vec![Value::Array(vec![
Value::Integer(0),
Value::Integer(16383),
Value::Array(vec![
Value::Bulk(host),
Value::Integer(addr.port() as i64),
Value::Bulk(node_id),
]),
])])
}
fn node_id_for(addr: &std::net::SocketAddr) -> String {
let base = format!("{:04x}", crc16(addr.to_string().as_bytes()));
let mut out = String::with_capacity(40);
for _ in 0..10 {
out.push_str(&base);
}
out
}
fn key_slot(key: &[u8]) -> u16 {
let key = match hash_tag(key) {
Some(tag) => tag,
None => key,
};
crc16(key) % 16384
}
fn hash_tag(key: &[u8]) -> Option<&[u8]> {
let start = key.iter().position(|&b| b == b'{')?;
let end = key[start + 1..].iter().position(|&b| b == b'}')?;
if end == 0 {
return None;
}
Some(&key[start + 1..start + 1 + end])
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn keyslot_hash_tag() {
let slot = key_slot(b"foo{bar}baz");
let slot2 = key_slot(b"bar");
assert_eq!(slot, slot2);
}
}