use crate::bridge::envelope::{PhysicalPlan, Status};
use crate::bridge::physical_plan::KvOp;
use crate::control::server::dispatch_utils;
use crate::control::server::wal_dispatch;
use crate::control::state::SharedState;
use crate::types::VShardId;
use super::codec::RespValue;
use super::command::RespCommand;
use super::session::RespSession;
pub async fn execute(
cmd: &RespCommand,
session: &mut RespSession,
state: &SharedState,
) -> RespValue {
match cmd.name.as_str() {
"PING" => handle_ping(cmd),
"ECHO" => handle_echo(cmd),
"SELECT" => handle_select(cmd, session),
"DBSIZE" => handle_dbsize(session, state).await,
"GET" => super::handler_kv::handle_get(cmd, session, state).await,
"SET" => super::handler_kv::handle_set(cmd, session, state).await,
"DEL" => super::handler_kv::handle_del(cmd, session, state).await,
"EXISTS" => super::handler_kv::handle_exists(cmd, session, state).await,
"MGET" => super::handler_kv::handle_mget(cmd, session, state).await,
"MSET" => super::handler_kv::handle_mset(cmd, session, state).await,
"EXPIRE" => handle_expire(cmd, session, state, false).await,
"PEXPIRE" => handle_expire(cmd, session, state, true).await,
"TTL" => handle_ttl(cmd, session, state, false).await,
"PTTL" => handle_ttl(cmd, session, state, true).await,
"PERSIST" => handle_persist(cmd, session, state).await,
"SCAN" => handle_scan(cmd, session, state).await,
"KEYS" => handle_keys(cmd, session, state).await,
"HGET" => super::handler_hash::handle_hget(cmd, session, state).await,
"HMGET" => super::handler_hash::handle_hmget(cmd, session, state).await,
"HSET" => super::handler_hash::handle_hset(cmd, session, state).await,
"FLUSHDB" => super::handler_hash::handle_flushdb(session, state).await,
"PUBLISH" => super::handler_pubsub::handle_publish(cmd, session, state).await,
"INFO" => handle_info(cmd, session, state).await,
"COMMAND" => RespValue::ok(), "QUIT" => RespValue::ok(),
_ => RespValue::err(format!("ERR unknown command '{}'", cmd.name)),
}
}
fn handle_ping(cmd: &RespCommand) -> RespValue {
match cmd.arg(0) {
Some(msg) => RespValue::bulk(msg.to_vec()),
None => RespValue::SimpleString("PONG".into()),
}
}
fn handle_echo(cmd: &RespCommand) -> RespValue {
match cmd.arg(0) {
Some(msg) => RespValue::bulk(msg.to_vec()),
None => RespValue::err("ERR wrong number of arguments for 'echo' command"),
}
}
fn handle_select(cmd: &RespCommand, session: &mut RespSession) -> RespValue {
match cmd.arg_str(0) {
Some(name) => {
session.collection = name.to_string();
RespValue::ok()
}
None => RespValue::err("ERR wrong number of arguments for 'select' command"),
}
}
async fn handle_expire(
cmd: &RespCommand,
session: &RespSession,
state: &SharedState,
is_pexpire: bool,
) -> RespValue {
if cmd.argc() < 2 {
let name = if is_pexpire { "pexpire" } else { "expire" };
return RespValue::err(format!(
"ERR wrong number of arguments for '{name}' command"
));
}
let key = cmd.args[0].clone();
let ttl_ms = match cmd.arg_i64(1) {
Some(v) if v > 0 => {
if is_pexpire {
v as u64
} else {
(v as u64) * 1000
}
}
_ => return RespValue::err("ERR value is not an integer or out of range"),
};
let plan = PhysicalPlan::Kv(KvOp::Expire {
collection: session.collection.clone(),
key,
ttl_ms,
});
match dispatch_kv_write(state, session, plan).await {
Ok(resp) if resp.status == Status::Ok => RespValue::integer(1),
Ok(_) => RespValue::integer(0),
Err(e) => RespValue::err(format!("ERR {e}")),
}
}
async fn handle_ttl(
cmd: &RespCommand,
session: &RespSession,
state: &SharedState,
is_pttl: bool,
) -> RespValue {
let Some(key) = cmd.arg(0) else {
let name = if is_pttl { "pttl" } else { "ttl" };
return RespValue::err(format!(
"ERR wrong number of arguments for '{name}' command"
));
};
let plan = PhysicalPlan::Kv(KvOp::Get {
collection: session.collection.clone(),
key: key.to_vec(),
rls_filters: Vec::new(),
});
match dispatch_kv(state, session, plan).await {
Ok(resp) if resp.status == Status::Ok && !resp.payload.is_empty() => RespValue::integer(-1),
Ok(_) => RespValue::integer(-2),
Err(e) => RespValue::err(format!("ERR {e}")),
}
}
async fn handle_persist(
cmd: &RespCommand,
session: &RespSession,
state: &SharedState,
) -> RespValue {
let Some(key) = cmd.arg(0) else {
return RespValue::err("ERR wrong number of arguments for 'persist' command");
};
let plan = PhysicalPlan::Kv(KvOp::Persist {
collection: session.collection.clone(),
key: key.to_vec(),
});
match dispatch_kv_write(state, session, plan).await {
Ok(resp) if resp.status == Status::Ok => RespValue::integer(1),
Ok(_) => RespValue::integer(0),
Err(e) => RespValue::err(format!("ERR {e}")),
}
}
async fn handle_scan(cmd: &RespCommand, session: &RespSession, state: &SharedState) -> RespValue {
let cursor_str = cmd.arg_str(0).unwrap_or("0");
let cursor = if cursor_str == "0" {
Vec::new()
} else {
base64::Engine::decode(&base64::engine::general_purpose::STANDARD, cursor_str)
.unwrap_or_default()
};
let mut match_pattern: Option<String> = None;
let mut count: usize = 10;
let mut filter_bytes: Vec<u8> = Vec::new();
let mut i = 1;
while i < cmd.argc() {
match cmd.arg_str(i).map(|s| s.to_uppercase()) {
Some(ref flag) if flag == "MATCH" => {
match_pattern = cmd.arg_str(i + 1).map(|s| s.to_string());
i += 2;
}
Some(ref flag) if flag == "COUNT" => {
count = cmd.arg_i64(i + 1).unwrap_or(10) as usize;
i += 2;
}
Some(ref flag) if flag == "FILTER" => {
if i + 4 <= cmd.argc() {
let field = cmd.arg_str(i + 1).unwrap_or("");
let _op = cmd.arg_str(i + 2).unwrap_or(""); let value = cmd.arg_str(i + 3).unwrap_or("");
let scan_filter = serde_json::json!([{
"field": field,
"op": "eq",
"value": value,
}]);
match rmp_serde::to_vec(&scan_filter) {
Ok(bytes) => filter_bytes = bytes,
Err(_) => {
return RespValue::err("ERR filter serialization failed");
}
}
i += 4;
} else {
i += 1;
}
}
_ => {
i += 1;
}
}
}
let plan = PhysicalPlan::Kv(KvOp::Scan {
collection: session.collection.clone(),
cursor,
count,
filters: filter_bytes,
match_pattern,
});
match dispatch_kv(state, session, plan).await {
Ok(resp) if resp.status == Status::Ok => {
let json: serde_json::Value = serde_json::from_slice(&resp.payload).unwrap_or_default();
let next_cursor = json
.get("cursor")
.and_then(|v| v.as_str())
.unwrap_or("0")
.to_string();
let entries = json
.get("entries")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let keys: Vec<RespValue> = entries
.iter()
.filter_map(|e| {
e.get("key").and_then(|k| k.as_str()).and_then(|b64| {
base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64)
.ok()
.map(RespValue::bulk)
})
})
.collect();
RespValue::array(vec![
RespValue::bulk_str(&next_cursor),
RespValue::array(keys),
])
}
Ok(_) => RespValue::array(vec![RespValue::bulk_str("0"), RespValue::array(vec![])]),
Err(e) => RespValue::err(format!("ERR {e}")),
}
}
async fn handle_keys(cmd: &RespCommand, session: &RespSession, state: &SharedState) -> RespValue {
let pattern = cmd.arg_str(0).unwrap_or("*");
let plan = PhysicalPlan::Kv(KvOp::Scan {
collection: session.collection.clone(),
cursor: Vec::new(),
count: 100_000,
filters: Vec::new(),
match_pattern: Some(pattern.to_string()),
});
match dispatch_kv(state, session, plan).await {
Ok(resp) if resp.status == Status::Ok => {
let json: serde_json::Value = serde_json::from_slice(&resp.payload).unwrap_or_default();
let entries = json
.get("entries")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let keys: Vec<RespValue> = entries
.iter()
.filter_map(|e| {
e.get("key").and_then(|k| k.as_str()).and_then(|b64| {
base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64)
.ok()
.map(RespValue::bulk)
})
})
.collect();
RespValue::array(keys)
}
Ok(_) => RespValue::array(vec![]),
Err(e) => RespValue::err(format!("ERR {e}")),
}
}
async fn handle_dbsize(session: &RespSession, state: &SharedState) -> RespValue {
let plan = PhysicalPlan::Kv(KvOp::Scan {
collection: session.collection.clone(),
cursor: Vec::new(),
count: 0,
filters: Vec::new(),
match_pattern: None,
});
match dispatch_kv(state, session, plan).await {
Ok(resp) if resp.status == Status::Ok => {
let json: serde_json::Value = serde_json::from_slice(&resp.payload).unwrap_or_default();
let count = json.get("count").and_then(|v| v.as_i64()).unwrap_or(0);
RespValue::integer(count)
}
_ => RespValue::integer(0),
}
}
async fn handle_info(_cmd: &RespCommand, session: &RespSession, _state: &SharedState) -> RespValue {
let info = format!(
"# Server\r\nnodedb_version:0.1.0\r\n\r\n# Keyspace\r\ndb:{}\r\n",
session.collection
);
RespValue::bulk(info.into_bytes())
}
pub(super) async fn dispatch_kv(
state: &SharedState,
session: &RespSession,
plan: PhysicalPlan,
) -> crate::Result<crate::bridge::envelope::Response> {
let vshard = VShardId::from_collection(&session.collection);
dispatch_utils::dispatch_to_data_plane(state, session.tenant_id, vshard, plan, 0)
.await
.map_err(map_busy_error)
}
pub(super) async fn dispatch_kv_write(
state: &SharedState,
session: &RespSession,
plan: PhysicalPlan,
) -> crate::Result<crate::bridge::envelope::Response> {
let vshard = VShardId::from_collection(&session.collection);
wal_dispatch::wal_append_if_write(&state.wal, session.tenant_id, vshard, &plan)?;
dispatch_utils::dispatch_to_data_plane(state, session.tenant_id, vshard, plan, 0)
.await
.map_err(map_busy_error)
}
fn map_busy_error(e: crate::Error) -> crate::Error {
match &e {
crate::Error::Bridge { .. } | crate::Error::Dispatch { .. } => crate::Error::Bridge {
detail: "BUSY NodeDB is processing requests, retry later".into(),
},
_ => e,
}
}
pub(super) fn parse_json_field_i64(
payload: &crate::bridge::envelope::Payload,
field: &str,
) -> Option<i64> {
let json: serde_json::Value = serde_json::from_slice(payload).ok()?;
json.get(field)?.as_i64()
}