use bytes::Bytes;
use resp_async::response::RespError;
use resp_async::{Cmd, State, Value};
use sqlx::Row;
use std::collections::HashMap;
use std::sync::Arc;
use crate::handlers::util::{
arg_as_bytes, arg_as_f64, arg_as_i64, glob_match, invalid_arguments, random_index,
shuffle_slice, wrong_arity, wrong_type,
};
use crate::state::{AppState, SessionHandle, now_ms};
use crate::storage::{TYPE_HASH, delete_key_all, is_expired, load_meta, map_sql_err};
pub async fn hset(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() < 3 || cmd.args.len() % 2 != 1 {
return Err(wrong_arity("HSET"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let key_exists = if let Some(meta) = load_meta(auth.pool.as_ref(), key, now).await? {
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
true
} else {
false
};
let mut fields = HashMap::new();
let mut i = 1;
while i < cmd.args.len() {
let field = arg_as_bytes(&cmd.args[i])?.clone();
let value = arg_as_bytes(&cmd.args[i + 1])?.clone();
fields.insert(field, value);
i += 2;
}
let mut tx = auth.pool.begin().await.map_err(map_sql_err)?;
let mut qb =
sqlx::QueryBuilder::new("INSERT IGNORE INTO redis_hash (r_key, h_field, h_value) ");
qb.push_values(fields.iter(), |mut row, (field, value)| {
row.push_bind(key.as_ref());
row.push_bind(field.as_ref());
row.push_bind(value.as_ref());
});
let res = qb.build().execute(&mut *tx).await.map_err(map_sql_err)?;
let added = res.rows_affected() as i64;
if added > 0 {
if key_exists {
let res =
sqlx::query("UPDATE redis_kv SET r_len = r_len + ? WHERE r_key = ? AND r_type = ?")
.bind(added)
.bind(key.as_ref())
.bind(TYPE_HASH)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
if res.rows_affected() == 0 {
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
if let Some(meta) = meta {
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
} else {
sqlx::query(
"INSERT INTO redis_kv (r_key, r_type, r_len, expires_at_ms) \
VALUES (?, ?, ?, NULL)",
)
.bind(key.as_ref())
.bind(TYPE_HASH)
.bind(added)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
}
}
} else {
let inserted = sqlx::query(
"INSERT INTO redis_kv (r_key, r_type, r_len, expires_at_ms) \
VALUES (?, ?, ?, NULL)",
)
.bind(key.as_ref())
.bind(TYPE_HASH)
.bind(added)
.execute(&mut *tx)
.await;
if let Err(err) = inserted {
if is_duplicate_key(&err) {
let res = sqlx::query(
"UPDATE redis_kv SET r_len = r_len + ? WHERE r_key = ? AND r_type = ?",
)
.bind(added)
.bind(key.as_ref())
.bind(TYPE_HASH)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
if res.rows_affected() == 0 {
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
if let Some(meta) = meta
&& meta.r_type != TYPE_HASH
{
return Ok(wrong_type());
}
}
} else {
return Err(map_sql_err(err));
}
}
}
}
if added < fields.len() as i64 {
let mut qb = sqlx::QueryBuilder::new("UPDATE redis_hash SET h_value = CASE h_field ");
for (field, value) in fields.iter() {
qb.push("WHEN ");
qb.push_bind(field.as_ref());
qb.push(" THEN ");
qb.push_bind(value.as_ref());
}
qb.push(" ELSE h_value END WHERE r_key = ");
qb.push_bind(key.as_ref());
qb.push(" AND h_field IN (");
let mut separated = qb.separated(", ");
for field in fields.keys() {
separated.push_bind(field.as_ref());
}
qb.push(")");
qb.build().execute(&mut *tx).await.map_err(map_sql_err)?;
}
tx.commit().await.map_err(map_sql_err)?;
Ok(Value::Integer(added.max(0)))
}
fn is_duplicate_key(err: &sqlx::Error) -> bool {
if let sqlx::Error::Database(db_err) = err
&& let Some(code) = db_err.code()
{
return code.as_ref() == "1062";
}
false
}
pub async fn hsetnx(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 3 {
return Err(wrong_arity("HSETNX"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let field = arg_as_bytes(&cmd.args[1])?;
let value = arg_as_bytes(&cmd.args[2])?;
let now = now_ms() as i64;
if let Some(meta) = load_meta(auth.pool.as_ref(), key, now).await?
&& meta.r_type != TYPE_HASH
{
return Ok(wrong_type());
}
let mut tx = auth.pool.begin().await.map_err(map_sql_err)?;
let res =
sqlx::query("INSERT IGNORE INTO redis_hash (r_key, h_field, h_value) VALUES (?, ?, ?)")
.bind(key.as_ref())
.bind(field.as_ref())
.bind(value.as_ref())
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
let inserted = res.rows_affected() as i64;
if inserted > 0 {
sqlx::query(
"INSERT INTO redis_kv (r_key, r_type, r_len, expires_at_ms) \
VALUES (?, ?, 1, NULL) \
ON DUPLICATE KEY UPDATE r_len = r_len + 1",
)
.bind(key.as_ref())
.bind(TYPE_HASH)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
}
tx.commit().await.map_err(map_sql_err)?;
Ok(Value::Integer(inserted))
}
pub async fn hget(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 2 {
return Err(wrong_arity("HGET"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let field = arg_as_bytes(&cmd.args[1])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Null);
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
let row = sqlx::query("SELECT h_value FROM redis_hash WHERE r_key = ? AND h_field = ?")
.bind(key.as_ref())
.bind(field.as_ref())
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Ok(Value::Null);
};
let value: Option<Vec<u8>> = row.try_get("h_value").map_err(map_sql_err)?;
Ok(value
.map(Bytes::from)
.map(Value::Bulk)
.unwrap_or(Value::Null))
}
pub async fn hmget(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() < 2 {
return Err(wrong_arity("HMGET"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Array(vec![Value::Null; cmd.args.len() - 1]));
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
let mut qb = sqlx::QueryBuilder::new("SELECT h_field, h_value FROM redis_hash WHERE r_key = ");
qb.push_bind(key.as_ref());
qb.push(" AND h_field IN (");
let mut separated = qb.separated(", ");
for arg in cmd.args.iter().skip(1) {
let field = arg_as_bytes(arg)?;
separated.push_bind(field.as_ref());
}
qb.push(")");
let rows = qb
.build()
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let mut map = HashMap::new();
for row in rows {
let field: Vec<u8> = row.try_get("h_field").map_err(map_sql_err)?;
let value: Option<Vec<u8>> = row.try_get("h_value").map_err(map_sql_err)?;
map.insert(field, value);
}
let mut out = Vec::with_capacity(cmd.args.len() - 1);
for arg in cmd.args.iter().skip(1) {
let field = arg_as_bytes(arg)?;
match map.get(field.as_ref()) {
Some(Some(value)) => out.push(Value::Bulk(Bytes::from(value.clone()))),
Some(None) => out.push(Value::Null),
None => out.push(Value::Null),
}
}
Ok(Value::Array(out))
}
pub async fn hgetall(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 1 {
return Err(wrong_arity("HGETALL"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Array(Vec::new()));
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
let rows = sqlx::query("SELECT h_field, h_value FROM redis_hash WHERE r_key = ?")
.bind(key.as_ref())
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let mut out = Vec::with_capacity(rows.len() * 2);
for row in rows {
let field: Vec<u8> = row.try_get("h_field").map_err(map_sql_err)?;
let value: Option<Vec<u8>> = row.try_get("h_value").map_err(map_sql_err)?;
out.push(Value::Bulk(Bytes::from(field)));
out.push(Value::Bulk(Bytes::from(value.unwrap_or_default())));
}
Ok(Value::Array(out))
}
pub async fn hkeys(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 1 {
return Err(wrong_arity("HKEYS"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Array(Vec::new()));
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
let rows = sqlx::query("SELECT h_field FROM redis_hash WHERE r_key = ?")
.bind(key.as_ref())
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let mut out = Vec::with_capacity(rows.len());
for row in rows {
let field: Vec<u8> = row.try_get("h_field").map_err(map_sql_err)?;
out.push(Value::Bulk(Bytes::from(field)));
}
Ok(Value::Array(out))
}
pub async fn hvals(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 1 {
return Err(wrong_arity("HVALS"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Array(Vec::new()));
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
let rows = sqlx::query("SELECT h_value FROM redis_hash WHERE r_key = ?")
.bind(key.as_ref())
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let mut out = Vec::with_capacity(rows.len());
for row in rows {
let value: Option<Vec<u8>> = row.try_get("h_value").map_err(map_sql_err)?;
out.push(Value::Bulk(Bytes::from(value.unwrap_or_default())));
}
Ok(Value::Array(out))
}
pub async fn hexists(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 2 {
return Err(wrong_arity("HEXISTS"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let field = arg_as_bytes(&cmd.args[1])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Integer(0));
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
let row = sqlx::query("SELECT 1 FROM redis_hash WHERE r_key = ? AND h_field = ?")
.bind(key.as_ref())
.bind(field.as_ref())
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
Ok(Value::Integer(if row.is_some() { 1 } else { 0 }))
}
pub async fn hdel(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() < 2 {
return Err(wrong_arity("HDEL"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Integer(0));
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
let mut qb = sqlx::QueryBuilder::new("DELETE FROM redis_hash WHERE r_key = ");
qb.push_bind(key.as_ref());
qb.push(" AND h_field IN (");
let mut separated = qb.separated(", ");
for arg in cmd.args.iter().skip(1) {
let field = arg_as_bytes(arg)?;
separated.push_bind(field.as_ref());
}
qb.push(")");
let mut tx = auth.pool.begin().await.map_err(map_sql_err)?;
let res = qb.build().execute(&mut *tx).await.map_err(map_sql_err)?;
let deleted = res.rows_affected() as i64;
if deleted > 0 {
let res =
sqlx::query("UPDATE redis_kv SET r_len = r_len - ? WHERE r_key = ? AND r_type = ?")
.bind(deleted)
.bind(key.as_ref())
.bind(TYPE_HASH)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
if res.rows_affected() > 0 {
sqlx::query("DELETE FROM redis_kv WHERE r_key = ? AND r_type = ? AND r_len <= 0")
.bind(key.as_ref())
.bind(TYPE_HASH)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
}
}
tx.commit().await.map_err(map_sql_err)?;
Ok(Value::Integer(deleted))
}
pub async fn hlen(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 1 {
return Err(wrong_arity("HLEN"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let meta = match load_meta(auth.pool.as_ref(), key, now).await {
Ok(meta) => meta,
Err(err) => {
if let RespError::InvalidData(msg) = &err
&& msg.starts_with(b"ERR backend")
{
return hlen_fallback(auth.pool.as_ref(), key, now).await;
}
return Err(err);
}
};
let Some(meta) = meta else {
return Ok(Value::Integer(0));
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
Ok(Value::Integer(meta.r_len))
}
async fn hlen_fallback(pool: &sqlx::MySqlPool, key: &Bytes, now: i64) -> Result<Value, RespError> {
let row = sqlx::query("SELECT r_type, expires_at_ms FROM redis_kv WHERE r_key = ?")
.bind(key.as_ref())
.fetch_optional(pool)
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Ok(Value::Integer(0));
};
let r_type: u8 = row.try_get("r_type").map_err(map_sql_err)?;
let expires_at_ms: Option<i64> = row.try_get("expires_at_ms").map_err(map_sql_err)?;
if is_expired(expires_at_ms, now) {
delete_key_all(pool, key).await?;
return Ok(Value::Integer(0));
}
if r_type != TYPE_HASH {
return Ok(wrong_type());
}
let row = sqlx::query("SELECT COUNT(*) AS count FROM redis_hash WHERE r_key = ?")
.bind(key.as_ref())
.fetch_one(pool)
.await
.map_err(map_sql_err)?;
let count: i64 = row.try_get("count").map_err(map_sql_err)?;
Ok(Value::Integer(count))
}
pub async fn hstrlen(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 2 {
return Err(wrong_arity("HSTRLEN"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let field = arg_as_bytes(&cmd.args[1])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Integer(0));
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
let row = sqlx::query(
"SELECT OCTET_LENGTH(h_value) AS len FROM redis_hash WHERE r_key = ? AND h_field = ?",
)
.bind(key.as_ref())
.bind(field.as_ref())
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Ok(Value::Integer(0));
};
let len: i64 = row.try_get("len").map_err(map_sql_err)?;
Ok(Value::Integer(len))
}
pub async fn hincrby(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
hincrby_inner(cmd, state, session, false).await
}
pub async fn hincrbyfloat(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
hincrby_inner(cmd, state, session, true).await
}
async fn hincrby_inner(
cmd: resp_async::Command,
state: Arc<AppState>,
session: Arc<crate::state::Session>,
float_mode: bool,
) -> Result<Value, RespError> {
if cmd.args.len() != 3 {
return Err(wrong_arity(if float_mode {
"HINCRBYFLOAT"
} else {
"HINCRBY"
}));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let field = arg_as_bytes(&cmd.args[1])?;
let increment_i = if float_mode {
None
} else {
Some(arg_as_i64(&cmd.args[2])?)
};
let increment_f = if float_mode {
Some(arg_as_f64(&cmd.args[2])?)
} else {
None
};
let now = now_ms() as i64;
if let Some(meta) = load_meta(auth.pool.as_ref(), key, now).await?
&& meta.r_type != TYPE_HASH
{
return Ok(wrong_type());
}
for _ in 0..3 {
let row = sqlx::query("SELECT h_value FROM redis_hash WHERE r_key = ? AND h_field = ?")
.bind(key.as_ref())
.bind(field.as_ref())
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let mut exists = false;
let new_value = if let Some(row) = row {
exists = true;
let value: Option<Vec<u8>> = row.try_get("h_value").map_err(map_sql_err)?;
let value = value.unwrap_or_default();
if float_mode {
let base = std::str::from_utf8(&value)
.ok()
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| RespError::invalid_data("ERR value is not a valid float"))?;
let inc = increment_f.unwrap_or(0.0);
(base + inc).to_string()
} else {
let base = std::str::from_utf8(&value)
.ok()
.and_then(|s| s.parse::<i64>().ok())
.ok_or_else(|| {
invalid_arguments("ERR value is not an integer or out of range")
})?;
let inc = increment_i.unwrap_or(0);
base.checked_add(inc)
.ok_or_else(|| invalid_arguments("ERR increment or decrement would overflow"))?
.to_string()
}
} else if float_mode {
increment_f.unwrap_or(0.0).to_string()
} else {
increment_i.unwrap_or(0).to_string()
};
if exists {
let res =
sqlx::query("UPDATE redis_hash SET h_value = ? WHERE r_key = ? AND h_field = ?")
.bind(new_value.as_bytes())
.bind(key.as_ref())
.bind(field.as_ref())
.execute(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
if res.rows_affected() == 0 {
continue;
}
} else {
let res = sqlx::query(
"INSERT IGNORE INTO redis_hash (r_key, h_field, h_value) VALUES (?, ?, ?)",
)
.bind(key.as_ref())
.bind(field.as_ref())
.bind(new_value.as_bytes())
.execute(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
if res.rows_affected() == 0 {
continue;
}
sqlx::query(
"INSERT INTO redis_kv (r_key, r_type, r_len, expires_at_ms) \
VALUES (?, ?, 1, NULL) \
ON DUPLICATE KEY UPDATE r_len = r_len + 1",
)
.bind(key.as_ref())
.bind(TYPE_HASH)
.execute(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
}
if float_mode {
return Ok(Value::Bulk(Bytes::from(new_value)));
}
let value: i64 = new_value.parse().unwrap_or_default();
return Ok(Value::Integer(value));
}
Err(RespError::internal())
}
pub async fn hrandfield(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.is_empty() {
return Err(wrong_arity("HRANDFIELD"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let mut count: Option<i64> = None;
let mut with_values = false;
if cmd.args.len() >= 2 {
if let Ok(value) = arg_as_i64(&cmd.args[1]) {
count = Some(value);
if cmd.args.len() >= 3 {
let mut token = arg_as_bytes(&cmd.args[2])?.to_vec();
for b in &mut token {
b.make_ascii_uppercase();
}
if token.as_slice() == b"WITHVALUES" {
with_values = true;
}
}
} else {
let mut token = arg_as_bytes(&cmd.args[1])?.to_vec();
for b in &mut token {
b.make_ascii_uppercase();
}
if token.as_slice() == b"WITHVALUES" {
with_values = true;
}
}
}
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(if count.is_some() {
Value::Array(Vec::new())
} else {
Value::Null
});
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
let len = meta.r_len;
if len <= 0 {
return Ok(if count.is_some() {
Value::Array(Vec::new())
} else {
Value::Null
});
}
match count {
None => {
let offset = random_index(len);
if !with_values {
let row = sqlx::query(
"SELECT h_field FROM redis_hash WHERE r_key = ? ORDER BY h_field LIMIT 1 OFFSET ?",
)
.bind(key.as_ref())
.bind(offset)
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Ok(Value::Null);
};
let field: Vec<u8> = row.try_get("h_field").map_err(map_sql_err)?;
return Ok(Value::Bulk(Bytes::from(field)));
}
let row = sqlx::query(
"SELECT h_field, h_value FROM redis_hash WHERE r_key = ? ORDER BY h_field LIMIT 1 OFFSET ?",
)
.bind(key.as_ref())
.bind(offset)
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Ok(Value::Null);
};
let field: Vec<u8> = row.try_get("h_field").map_err(map_sql_err)?;
let value: Option<Vec<u8>> = row.try_get("h_value").map_err(map_sql_err)?;
Ok(Value::Array(vec![
Value::Bulk(Bytes::from(field)),
Value::Bulk(Bytes::from(value.unwrap_or_default())),
]))
}
Some(n) if n >= 0 => {
if n == 0 {
return Ok(Value::Array(Vec::new()));
}
let fetch_all = n >= len;
if !with_values {
let rows = if fetch_all {
sqlx::query("SELECT h_field FROM redis_hash WHERE r_key = ? ORDER BY h_field")
.bind(key.as_ref())
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?
} else {
let range = len - n + 1;
let offset = random_index(range);
sqlx::query(
"SELECT h_field FROM redis_hash WHERE r_key = ? ORDER BY h_field LIMIT ? OFFSET ?",
)
.bind(key.as_ref())
.bind(n)
.bind(offset)
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?
};
let mut fields = Vec::with_capacity(rows.len());
for row in rows {
let field: Vec<u8> = row.try_get("h_field").map_err(map_sql_err)?;
fields.push(Bytes::from(field));
}
if fetch_all {
shuffle_slice(&mut fields);
}
let out = fields.into_iter().map(Value::Bulk).collect();
return Ok(Value::Array(out));
}
let rows = if fetch_all {
sqlx::query(
"SELECT h_field, h_value FROM redis_hash WHERE r_key = ? ORDER BY h_field",
)
.bind(key.as_ref())
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?
} else {
let range = len - n + 1;
let offset = random_index(range);
sqlx::query(
"SELECT h_field, h_value FROM redis_hash WHERE r_key = ? ORDER BY h_field LIMIT ? OFFSET ?",
)
.bind(key.as_ref())
.bind(n)
.bind(offset)
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?
};
let mut pairs = Vec::with_capacity(rows.len());
for row in rows {
let field: Vec<u8> = row.try_get("h_field").map_err(map_sql_err)?;
let value: Option<Vec<u8>> = row.try_get("h_value").map_err(map_sql_err)?;
pairs.push((Bytes::from(field), Bytes::from(value.unwrap_or_default())));
}
if fetch_all {
shuffle_slice(&mut pairs);
}
let mut out = Vec::with_capacity(pairs.len() * 2);
for (field, value) in pairs {
out.push(Value::Bulk(field));
out.push(Value::Bulk(value));
}
Ok(Value::Array(out))
}
Some(n) => {
let n = n.abs();
let mut out = Vec::new();
if !with_values {
for _ in 0..n {
let offset = random_index(len);
let row = sqlx::query(
"SELECT h_field FROM redis_hash WHERE r_key = ? ORDER BY h_field LIMIT 1 OFFSET ?",
)
.bind(key.as_ref())
.bind(offset)
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
break;
};
let field: Vec<u8> = row.try_get("h_field").map_err(map_sql_err)?;
out.push(Value::Bulk(Bytes::from(field)));
}
return Ok(Value::Array(out));
}
for _ in 0..n {
let offset = random_index(len);
let row = sqlx::query(
"SELECT h_field, h_value FROM redis_hash WHERE r_key = ? ORDER BY h_field LIMIT 1 OFFSET ?",
)
.bind(key.as_ref())
.bind(offset)
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
break;
};
let field: Vec<u8> = row.try_get("h_field").map_err(map_sql_err)?;
let value: Option<Vec<u8>> = row.try_get("h_value").map_err(map_sql_err)?;
out.push(Value::Bulk(Bytes::from(field)));
out.push(Value::Bulk(Bytes::from(value.unwrap_or_default())));
}
Ok(Value::Array(out))
}
}
}
pub async fn hscan(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() < 2 {
return Err(wrong_arity("HSCAN"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Array(vec![
Value::Bulk(Bytes::from_static(b"0")),
Value::Array(Vec::new()),
]));
};
if meta.r_type != TYPE_HASH {
return Ok(wrong_type());
}
let cursor = arg_as_bytes(&cmd.args[1])?;
let mut count = 10i64;
let mut pattern: Option<Bytes> = None;
let mut i = 2;
while i < cmd.args.len() {
let mut token = arg_as_bytes(&cmd.args[i])?.to_vec();
for b in &mut token {
b.make_ascii_uppercase();
}
match token.as_slice() {
b"MATCH" => {
let next = cmd
.args
.get(i + 1)
.ok_or_else(|| RespError::invalid_data("ERR syntax error"))?;
pattern = Some(arg_as_bytes(next)?.clone());
i += 2;
}
b"COUNT" => {
let next = cmd
.args
.get(i + 1)
.ok_or_else(|| RespError::invalid_data("ERR syntax error"))?;
count = arg_as_i64(next)?;
if count <= 0 {
return Err(RespError::invalid_data("ERR invalid COUNT"));
}
i += 2;
}
_ => return Err(RespError::invalid_data("ERR syntax error")),
}
}
let rows = sqlx::query(
"SELECT h_field, h_value FROM redis_hash WHERE r_key = ? AND h_field > ? \
ORDER BY h_field LIMIT ?",
)
.bind(key.as_ref())
.bind(cursor.as_ref())
.bind(count)
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let mut out = Vec::new();
let mut next_cursor: Option<Bytes> = None;
for row in &rows {
let field: Vec<u8> = row.try_get("h_field").map_err(map_sql_err)?;
let value: Option<Vec<u8>> = row.try_get("h_value").map_err(map_sql_err)?;
next_cursor = Some(Bytes::from(field.clone()));
if let Some(pattern) = &pattern
&& !glob_match(pattern.as_ref(), &field)
{
continue;
}
out.push(Value::Bulk(Bytes::from(field)));
out.push(Value::Bulk(Bytes::from(value.unwrap_or_default())));
}
let cursor_value = match next_cursor {
Some(value) if rows.len() as i64 == count => value,
_ => Bytes::from_static(b"0"),
};
Ok(Value::Array(vec![
Value::Bulk(cursor_value),
Value::Array(out),
]))
}