use crate::redis_cmd::RedisCommand;
use crate::util::RedisUtil;
use hashbrown::HashMap;
pub struct Rato {}
impl Rato {
pub const DB_TAG: &'static str = "database";
pub const AUTH_TAG: &'static str = "auth_success";
#[inline]
pub fn make_pub_sub_cmd(channels: Vec<&str>) -> Vec<u8> {
let mut output = RedisUtil::make_array(channels.len());
for val in channels {
output.extend(RedisUtil::make_bulk(&val.as_bytes()));
}
output
}
pub fn parse_input<T>(
event_handler: &T,
mut conn_tags: &mut HashMap<String, String>,
input: &mut Vec<u8>,
) -> (Vec<u8>, bool)
where
T: ?Sized + RedisCommand,
{
let mut output = Vec::new();
let mut close = false;
let mut i = 0;
let mut argss = Vec::new();
debug!(
"Redis Command/Payload received: {}",
String::from_utf8_lossy(&input)
);
loop {
let (args, err, ni, complete) = RedisUtil::redcon_take_args(input, i);
if err != "" {
output.extend(format!("-{}\r\n", err).into_bytes());
close = true;
error!(
"Failed to parse redis response. Error:{}",
String::from_utf8_lossy(&output)
);
break;
} else if !complete {
break;
}
i = ni;
if !args.is_empty() {
argss.push(args);
}
}
debug!("argss len:{}, close: {}", argss.len(), close);
if !close && !argss.is_empty() {
for args in argss {
let mut close_session = false;
let hout =
Rato::handle_command(event_handler, &mut conn_tags, &args, &mut close_session);
output.extend_from_slice(hout.as_slice());
if close_session {
close = true;
break;
}
}
}
if i > 0 {
if i < input.len() {
let mut remain = Vec::with_capacity(input.len() - i);
remain.extend_from_slice(&input[i..input.len()]);
debug!("Remaning buffer:{}", String::from_utf8_lossy(&remain));
input.clear();
input.extend(remain)
} else {
input.clear()
}
}
debug!(
"Redis Command response: {}",
String::from_utf8_lossy(&output)
);
(output, close)
}
fn handle_command<T>(
event_handler: &T,
conn_tags: &mut HashMap<String, String>,
args: &[Vec<u8>],
close_session: &mut bool,
) -> Vec<u8>
where
T: ?Sized + RedisCommand,
{
let db_tag = conn_tags.get(Rato::DB_TAG);
let db = if db_tag.is_some() {
db_tag.unwrap().as_bytes().to_vec()
} else {
vec![]
};
let mut authorize_access = false;
let auth_status = conn_tags.get(Rato::AUTH_TAG);
if RedisUtil::arg_match(&args[0], "AUTH")
|| (auth_status.is_some() && auth_status.unwrap() == "true")
{
authorize_access = true;
}
if !authorize_access {
return b"-ERR You must authenticate\r\n".to_vec();
}
debug!(
"DB:{}, Command:{}, Number of args:{}",
String::from_utf8_lossy(&db),
String::from_utf8_lossy(&args[0]),
args.len()
);
if RedisUtil::arg_match(&args[0], "AUTH") {
match args.len() {
2 => {
if let Err(e) = event_handler.on_cmd_auth(&db, &args[1]) {
conn_tags.insert(Rato::AUTH_TAG.to_string(), "false".to_string());
format!("-ERR Login Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec()
} else {
conn_tags.insert(Rato::AUTH_TAG.to_string(), "true".to_string());
b"+OK\r\n".to_vec()
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "SELECT") {
match args.len() {
2 => {
conn_tags.insert(
Rato::DB_TAG.to_string(),
String::from_utf8_lossy(&args[1]).to_string(),
);
b"+OK\r\n".to_vec()
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "PING") {
match args.len() {
1 => {
let v = vec![];
event_handler.on_cmd_ping(&v);
b"+PONG\r\n".to_vec()
}
2 => {
event_handler.on_cmd_ping(&args[1]);
RedisUtil::make_bulk(&args[1])
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "ECHO") {
match args.len() {
2 => {
event_handler.on_cmd_echo(&args[1]);
RedisUtil::make_bulk(&args[1])
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "SET") {
match args.len() {
3 => {
if let Err(e) = event_handler.on_cmd_set(&db, &args[1], &args[2]) {
format!("-ERR SET Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "HMSET") {
match args.len() {
1 | 2 | 3 => RedisUtil::invalid_num_args(&args[0]),
_ => {
let mut kv = HashMap::with_capacity(args.len());
for i in (2..args.len()).step_by(2) {
if i + 1 < args.len() {
debug!(
"HMSET: Key: {}, Val:{}",
String::from_utf8_lossy(&args[i]),
String::from_utf8_lossy(&args[i + 1])
);
kv.insert(&args[i], &args[i + 1]);
} else {
return RedisUtil::invalid_num_args(&args[0]);
}
}
if let Err(e) = event_handler.on_cmd_hmset(&db, &args[1], &kv) {
format!("-ERR HMSET Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
}
}
}
} else if RedisUtil::arg_match(&args[0], "HSET") {
info!("HSET :{}", args.len());
match args.len() {
4 => {
if let Err(e) = event_handler.on_cmd_hset(&db, &args[1], &args[2], &args[3]) {
error!("HSET :{}", e.to_string());
format!("-ERR HSET Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "FLUSHDB") {
(if let Err(e) = event_handler.on_cmd_flushdb(&db) {
format!("-ERR FLUSHDB Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
})
} else if RedisUtil::arg_match(&args[0], "BACKUPDB") {
(match args.len() {
1 => {
if let Err(e) = event_handler.on_cmd_backupdb(&[]) {
format!("-ERR Backup DB Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
}
}
2 => {
if let Err(e) = event_handler.on_cmd_backupdb(&args[1]) {
format!("-ERR Backup DB Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
})
} else if RedisUtil::arg_match(&args[0], "BACKUP_LRU_KEYS") {
(match args.len() {
1 => {
if let Err(e) = event_handler.on_cmd_backup_lru_keys(&[]) {
format!(
"-ERR BACKUP_LRU_KEYS Lru Keys Failed '{}'\r\n",
e.to_string()
)
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
}
}
2 => {
if let Err(e) = event_handler.on_cmd_backup_lru_keys(&args[1]) {
format!(
"-ERR BACKUP_LRU_KEYS Lru Keys Failed '{}'\r\n",
e.to_string()
)
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
})
} else if RedisUtil::arg_match(&args[0], "DEL") {
match args.len() {
2 => {
if let Err(_e) = event_handler.on_cmd_del(&db, &args[1]) {
b":0\r\n".to_vec()
} else {
b":1\r\n".to_vec()
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "GET") {
match args.len() {
2 => {
match event_handler.on_cmd_get(&db, &args[1]) {
Ok(v) => RedisUtil::make_bulk(&v),
Err(_e) => {
b"$-1\r\n".to_vec()
}
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "HMGET") {
match args.len() {
1 | 2 => RedisUtil::invalid_num_args(&args[0]),
_ => {
match event_handler.on_cmd_hmget(&db, &args[1], &args[2..]) {
Ok(v) => {
let mut output = RedisUtil::make_array(v.len());
for val in v {
output.extend(RedisUtil::make_bulk(&val));
}
output
}
Err(_e) => {
b"$-1\r\n".to_vec()
}
}
}
}
} else if RedisUtil::arg_match(&args[0], "HGET") {
match args.len() {
3 => {
match event_handler.on_cmd_hget(&db, &args[1], &args[2]) {
Ok(kv) => RedisUtil::make_bulk(&kv),
Err(_e) => {
b"$-1\r\n".to_vec()
}
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "HGETALL") {
match args.len() {
2 => {
match event_handler.on_cmd_hgetall(&db, &args[1]) {
Ok(v) => {
let mut output = RedisUtil::make_array(v.len() * 2);
for (key, val) in v {
output.extend(RedisUtil::make_bulk(&key));
output.extend(RedisUtil::make_bulk(&val));
}
output
}
Err(_e) => {
b"$-1\r\n".to_vec()
}
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "KEYS") {
format!(
"-ERR not supported command '{}'\r\n",
RedisUtil::safe_line_from_slice(&args[0])
)
.into_bytes()
.to_vec()
} else if RedisUtil::arg_match(&args[0], "QUIT") {
if let Err(_e) = event_handler.on_cmd_quit() {
b":0\r\n".to_vec()
} else {
*close_session = true;
b"+OK\r\n".to_vec()
}
} else if RedisUtil::arg_match(&args[0], "MESSAGE") {
debug!("Received Command MESSAGE");
match args.len() {
1 | 2 => RedisUtil::invalid_num_args(&args[0]),
_ => {
if let Err(_e) = event_handler.on_cmd_msg(&args[1], &args[2]) {
vec![]
} else {
vec![]
}
}
}
} else if RedisUtil::arg_match(&args[0], "SUBSCRIBE") {
match args.len() {
3 => {
if let Err(_e) = event_handler.on_cmd_subscribe(&args[1]) {
vec![]
} else {
vec![]
}
}
4 => {
for arg in args.iter() {
debug!("Subscription response: {}", String::from_utf8_lossy(&arg));
}
if let Err(_e) = event_handler.on_cmd_subscribe_response(&args[1], &args[2]) {
vec![]
} else {
vec![]
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else if RedisUtil::arg_match(&args[0], "CLUSTER") {
match args.len() {
2 => {
if RedisUtil::arg_match(&args[1], "NODES") {
match event_handler.on_cmd_cluster_nodes() {
Ok(v) => RedisUtil::make_bulk(&v),
Err(e) => format!("-ERR CLUSTER NODES Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec(),
}
} else if RedisUtil::arg_match(&args[1], "SLOTS") {
match event_handler.on_cmd_cluster_slots() {
Ok(v) => v,
Err(e) => format!("-ERR CLUSTER SLOTS Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec(),
}
} else {
RedisUtil::invalid_num_args(&args[0])
}
}
4 => {
if RedisUtil::arg_match(&args[1], "NODES")
&& RedisUtil::arg_match(&args[2], "REMOVE")
{
if let Err(e) = event_handler.on_cmd_cluster_remove_node(&args[3]) {
format!("-ERR CLUSTER NODES REMOVE Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
}
} else {
format!(
"-ERR invalid command '{}'\r\n",
RedisUtil::safe_line_from_slice(&args[0])
)
.into_bytes()
.to_vec()
}
}
13 => {
if !RedisUtil::arg_match(&args[1], "NODES") {
return format!(
"-ERR invalid command '{}'\r\n",
RedisUtil::safe_line_from_slice(&args[0])
)
.into_bytes()
.to_vec();
}
let mut kv = HashMap::with_capacity(args.len());
for i in (3..args.len()).step_by(2) {
if i + 1 < args.len() {
debug!(
"CLUSTER NODES ADD/UPDATE: Key: {}, Val:{}",
String::from_utf8_lossy(&args[i]),
String::from_utf8_lossy(&args[i + 1])
);
kv.insert(&args[i], &args[i + 1]);
} else {
return RedisUtil::invalid_num_args(&args[0]);
}
}
if RedisUtil::arg_match(&args[2], "ADD") {
if let Err(e) = event_handler.on_cmd_cluster_add_node(&kv) {
format!("-ERR CLUSTER NODES ADD Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
}
} else if RedisUtil::arg_match(&args[2], "UPDATE") {
if let Err(e) = event_handler.on_cmd_cluster_update_node(&kv) {
format!("-ERR CLUSTER NODES UPDATE Failed '{}'\r\n", e.to_string())
.into_bytes()
.to_vec()
} else {
b"+OK\r\n".to_vec()
}
} else {
format!(
"-ERR invalid command '{}'\r\n",
RedisUtil::safe_line_from_slice(&args[0])
)
.into_bytes()
.to_vec()
}
}
_ => RedisUtil::invalid_num_args(&args[0]),
}
} else {
format!(
"-ERR unknown command '{}'\r\n",
RedisUtil::safe_line_from_slice(&args[0])
)
.into_bytes()
.to_vec()
}
}
}