use bytes::Bytes;
use resp_async::response::RespError;
use resp_async::{Cmd, State, Value};
use sqlx::Row;
use std::sync::Arc;
use tokio::time::{self, Duration, Instant};
use crate::handlers::util::{arg_as_bytes, arg_as_f64, arg_as_i64, ok, wrong_arity, wrong_type};
use crate::state::{AppState, SessionHandle, now_ms};
use crate::storage::{TYPE_LIST, load_meta, map_sql_err};
#[derive(Debug, Clone, Copy)]
struct ListMeta {
head: i64,
tail: i64,
len: i64,
}
pub async fn lpush(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
push(cmd, state, session, true, false).await
}
pub async fn rpush(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
push(cmd, state, session, false, false).await
}
pub async fn lpushx(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
push(cmd, state, session, true, true).await
}
pub async fn rpushx(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
push(cmd, state, session, false, true).await
}
async fn push(
cmd: resp_async::Command,
state: Arc<AppState>,
session: Arc<crate::state::Session>,
left: bool,
only_if_exists: bool,
) -> Result<Value, RespError> {
if cmd.args.len() < 2 {
return Err(wrong_arity(if left { "LPUSH" } else { "RPUSH" }));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let values = cmd
.args
.iter()
.skip(1)
.map(arg_as_bytes)
.collect::<Result<Vec<_>, _>>()?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
if let Some(meta) = meta {
if meta.r_type != TYPE_LIST {
return Ok(wrong_type());
}
} else if only_if_exists {
return Ok(Value::Integer(0));
}
let mut tx = auth.pool.begin().await.map_err(map_sql_err)?;
let list_meta = fetch_list_meta(&mut tx, key).await?;
let mut head;
let mut tail;
let mut len;
if let Some(meta) = list_meta {
head = meta.head;
tail = meta.tail;
len = meta.len;
} else {
head = 0;
tail = -1;
len = 0;
sqlx::query(
"INSERT INTO redis_kv (r_key, r_type, r_len, expires_at_ms) VALUES (?, ?, 0, NULL)",
)
.bind(key.as_ref())
.bind(TYPE_LIST)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
}
let mut entries = Vec::with_capacity(values.len());
if left {
for value in values {
head -= 1;
if len == 0 {
tail = head;
}
len += 1;
entries.push((head, value));
}
} else {
for value in values {
tail += 1;
if len == 0 {
head = tail;
}
len += 1;
entries.push((tail, value));
}
}
if !entries.is_empty() {
let mut qb = sqlx::QueryBuilder::new("INSERT INTO redis_list (r_key, seq, l_value) ");
qb.push_values(entries.iter(), |mut row, (seq, value)| {
row.push_bind(key.as_ref());
row.push_bind(*seq);
row.push_bind(value.as_ref());
});
qb.build().execute(&mut *tx).await.map_err(map_sql_err)?;
}
upsert_list_meta(&mut tx, key, head, tail, len).await?;
tx.commit().await.map_err(map_sql_err)?;
Ok(Value::Integer(len))
}
pub async fn lpop(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
pop(cmd, state, session, true).await
}
pub async fn rpop(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
pop(cmd, state, session, false).await
}
async fn pop(
cmd: resp_async::Command,
state: Arc<AppState>,
session: Arc<crate::state::Session>,
left: bool,
) -> Result<Value, RespError> {
if cmd.args.len() != 1 {
return Err(wrong_arity(if left { "LPOP" } else { "RPOP" }));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Null);
};
if meta.r_type != TYPE_LIST {
return Ok(wrong_type());
}
let mut tx = auth.pool.begin().await.map_err(map_sql_err)?;
let list_meta = fetch_list_meta(&mut tx, key).await?;
let Some(meta) = list_meta else {
tx.commit().await.map_err(map_sql_err)?;
return Ok(Value::Null);
};
if meta.len <= 0 {
tx.commit().await.map_err(map_sql_err)?;
return Ok(Value::Null);
}
let seq = if left { meta.head } else { meta.tail };
let row = sqlx::query("SELECT l_value FROM redis_list WHERE r_key = ? AND seq = ?")
.bind(key.as_ref())
.bind(seq)
.fetch_optional(&mut *tx)
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
tx.commit().await.map_err(map_sql_err)?;
return Ok(Value::Null);
};
let value: Option<Vec<u8>> = row.try_get("l_value").map_err(map_sql_err)?;
sqlx::query("DELETE FROM redis_list WHERE r_key = ? AND seq = ?")
.bind(key.as_ref())
.bind(seq)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
let new_len = meta.len - 1;
if new_len <= 0 {
delete_list(&mut tx, key).await?;
} else {
let new_head = if left { meta.head + 1 } else { meta.head };
let new_tail = if left { meta.tail } else { meta.tail - 1 };
upsert_list_meta(&mut tx, key, new_head, new_tail, new_len).await?;
}
tx.commit().await.map_err(map_sql_err)?;
Ok(value
.map(Bytes::from)
.map(Value::Bulk)
.unwrap_or(Value::Null))
}
pub async fn llen(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 1 {
return Err(wrong_arity("LLEN"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Integer(0));
};
if meta.r_type != TYPE_LIST {
return Ok(wrong_type());
}
let row = sqlx::query("SELECT len FROM redis_list_meta WHERE r_key = ?")
.bind(key.as_ref())
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Ok(Value::Integer(0));
};
let len: i64 = row.try_get("len").map_err(map_sql_err)?;
Ok(Value::Integer(len))
}
pub async fn lrange(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 3 {
return Err(wrong_arity("LRANGE"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let start = arg_as_i64(&cmd.args[1])?;
let stop = arg_as_i64(&cmd.args[2])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Array(Vec::new()));
};
if meta.r_type != TYPE_LIST {
return Ok(wrong_type());
}
let list_meta = fetch_list_meta_pool(auth.pool.as_ref(), key).await?;
let Some(list_meta) = list_meta else {
return Ok(Value::Array(Vec::new()));
};
let Some((offset, count)) = normalize_range(start, stop, list_meta.len) else {
return Ok(Value::Array(Vec::new()));
};
let rows =
sqlx::query("SELECT l_value FROM redis_list WHERE r_key = ? ORDER BY seq LIMIT ?, ?")
.bind(key.as_ref())
.bind(offset)
.bind(count)
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let mut out = Vec::with_capacity(rows.len());
for row in rows {
let value: Option<Vec<u8>> = row.try_get("l_value").map_err(map_sql_err)?;
out.push(Value::Bulk(Bytes::from(value.unwrap_or_default())));
}
Ok(Value::Array(out))
}
pub async fn lindex(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 2 {
return Err(wrong_arity("LINDEX"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let index = arg_as_i64(&cmd.args[1])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Null);
};
if meta.r_type != TYPE_LIST {
return Ok(wrong_type());
}
let list_meta = fetch_list_meta_pool(auth.pool.as_ref(), key).await?;
let Some(list_meta) = list_meta else {
return Ok(Value::Null);
};
let Some((offset, _)) = normalize_range(index, index, list_meta.len) else {
return Ok(Value::Null);
};
let row =
sqlx::query("SELECT l_value FROM redis_list WHERE r_key = ? ORDER BY seq LIMIT 1 OFFSET ?")
.bind(key.as_ref())
.bind(offset)
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Ok(Value::Null);
};
let value: Option<Vec<u8>> = row.try_get("l_value").map_err(map_sql_err)?;
Ok(value
.map(Bytes::from)
.map(Value::Bulk)
.unwrap_or(Value::Null))
}
pub async fn lset(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 3 {
return Err(wrong_arity("LSET"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let index = arg_as_i64(&cmd.args[1])?;
let value = arg_as_bytes(&cmd.args[2])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Err(RespError::invalid_data("ERR no such key"));
};
if meta.r_type != TYPE_LIST {
return Ok(wrong_type());
}
let list_meta = fetch_list_meta_pool(auth.pool.as_ref(), key).await?;
let Some(list_meta) = list_meta else {
return Err(RespError::invalid_data("ERR no such key"));
};
let Some((offset, _)) = normalize_range(index, index, list_meta.len) else {
return Err(RespError::invalid_data("ERR index out of range"));
};
let row =
sqlx::query("SELECT seq FROM redis_list WHERE r_key = ? ORDER BY seq LIMIT 1 OFFSET ?")
.bind(key.as_ref())
.bind(offset)
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Err(RespError::invalid_data("ERR index out of range"));
};
let seq: i64 = row.try_get("seq").map_err(map_sql_err)?;
sqlx::query("UPDATE redis_list SET l_value = ? WHERE r_key = ? AND seq = ?")
.bind(value.as_ref())
.bind(key.as_ref())
.bind(seq)
.execute(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
Ok(ok())
}
pub async fn lrem(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 3 {
return Err(wrong_arity("LREM"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let count = arg_as_i64(&cmd.args[1])?;
let value = arg_as_bytes(&cmd.args[2])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Integer(0));
};
if meta.r_type != TYPE_LIST {
return Ok(wrong_type());
}
let list_meta = fetch_list_meta_pool(auth.pool.as_ref(), key).await?;
let Some(list_meta) = list_meta else {
return Ok(Value::Integer(0));
};
let mut sql = String::from("SELECT seq FROM redis_list WHERE r_key = ? AND l_value = ?");
if count > 0 {
sql.push_str(" ORDER BY seq ASC LIMIT ?");
} else if count < 0 {
sql.push_str(" ORDER BY seq DESC LIMIT ?");
}
let mut query = sqlx::query(&sql).bind(key.as_ref()).bind(value.as_ref());
if count != 0 {
query = query.bind(count.abs());
}
let rows = query
.fetch_all(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
if rows.is_empty() {
return Ok(Value::Integer(0));
}
let mut seqs = Vec::with_capacity(rows.len());
for row in rows {
let seq: i64 = row.try_get("seq").map_err(map_sql_err)?;
seqs.push(seq);
}
let mut qb = sqlx::QueryBuilder::new("DELETE FROM redis_list WHERE r_key = ");
qb.push_bind(key.as_ref());
qb.push(" AND seq IN (");
let mut separated = qb.separated(", ");
for seq in &seqs {
separated.push_bind(seq);
}
qb.push(")");
let mut tx = auth.pool.begin().await.map_err(map_sql_err)?;
qb.build().execute(&mut *tx).await.map_err(map_sql_err)?;
let new_len = list_meta.len - seqs.len() as i64;
if new_len <= 0 {
delete_list(&mut tx, key).await?;
} else {
let row = sqlx::query(
"SELECT MIN(seq) AS head, MAX(seq) AS tail FROM redis_list WHERE r_key = ?",
)
.bind(key.as_ref())
.fetch_one(&mut *tx)
.await
.map_err(map_sql_err)?;
let head: i64 = row.try_get("head").map_err(map_sql_err)?;
let tail: i64 = row.try_get("tail").map_err(map_sql_err)?;
upsert_list_meta(&mut tx, key, head, tail, new_len).await?;
}
tx.commit().await.map_err(map_sql_err)?;
Ok(Value::Integer(seqs.len() as i64))
}
pub async fn ltrim(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 3 {
return Err(wrong_arity("LTRIM"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let start = arg_as_i64(&cmd.args[1])?;
let stop = arg_as_i64(&cmd.args[2])?;
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(ok());
};
if meta.r_type != TYPE_LIST {
return Ok(wrong_type());
}
let list_meta = fetch_list_meta_pool(auth.pool.as_ref(), key).await?;
let Some(list_meta) = list_meta else {
return Ok(ok());
};
let Some((offset, count)) = normalize_range(start, stop, list_meta.len) else {
let mut tx = auth.pool.begin().await.map_err(map_sql_err)?;
delete_list(&mut tx, key).await?;
tx.commit().await.map_err(map_sql_err)?;
return Ok(ok());
};
let row_start =
sqlx::query("SELECT seq FROM redis_list WHERE r_key = ? ORDER BY seq LIMIT 1 OFFSET ?")
.bind(key.as_ref())
.bind(offset)
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let row_stop =
sqlx::query("SELECT seq FROM redis_list WHERE r_key = ? ORDER BY seq LIMIT 1 OFFSET ?")
.bind(key.as_ref())
.bind(offset + count - 1)
.fetch_optional(auth.pool.as_ref())
.await
.map_err(map_sql_err)?;
let (Some(row_start), Some(row_stop)) = (row_start, row_stop) else {
return Ok(ok());
};
let start_seq: i64 = row_start.try_get("seq").map_err(map_sql_err)?;
let stop_seq: i64 = row_stop.try_get("seq").map_err(map_sql_err)?;
let mut tx = auth.pool.begin().await.map_err(map_sql_err)?;
sqlx::query("DELETE FROM redis_list WHERE r_key = ? AND (seq < ? OR seq > ?)")
.bind(key.as_ref())
.bind(start_seq)
.bind(stop_seq)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
upsert_list_meta(&mut tx, key, start_seq, stop_seq, count).await?;
tx.commit().await.map_err(map_sql_err)?;
Ok(ok())
}
pub async fn linsert(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 4 {
return Err(wrong_arity("LINSERT"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let key = arg_as_bytes(&cmd.args[0])?;
let position = arg_as_bytes(&cmd.args[1])?;
let pivot = arg_as_bytes(&cmd.args[2])?;
let value = arg_as_bytes(&cmd.args[3])?;
let before = match position.as_ref() {
b"BEFORE" | b"before" => true,
b"AFTER" | b"after" => false,
_ => return Err(RespError::invalid_data("ERR syntax error")),
};
let now = now_ms() as i64;
let meta = load_meta(auth.pool.as_ref(), key, now).await?;
let Some(meta) = meta else {
return Ok(Value::Integer(0));
};
if meta.r_type != TYPE_LIST {
return Ok(wrong_type());
}
let mut tx = auth.pool.begin().await.map_err(map_sql_err)?;
let pivot_row = sqlx::query(
"SELECT seq FROM redis_list WHERE r_key = ? AND l_value = ? ORDER BY seq LIMIT 1",
)
.bind(key.as_ref())
.bind(pivot.as_ref())
.fetch_optional(&mut *tx)
.await
.map_err(map_sql_err)?;
let Some(pivot_row) = pivot_row else {
tx.commit().await.map_err(map_sql_err)?;
return Ok(Value::Integer(0));
};
let pivot_seq: i64 = pivot_row.try_get("seq").map_err(map_sql_err)?;
let neighbor_sql = if before {
"SELECT MAX(seq) AS seq FROM redis_list WHERE r_key = ? AND seq < ?"
} else {
"SELECT MIN(seq) AS seq FROM redis_list WHERE r_key = ? AND seq > ?"
};
let neighbor_row = sqlx::query(neighbor_sql)
.bind(key.as_ref())
.bind(pivot_seq)
.fetch_one(&mut *tx)
.await
.map_err(map_sql_err)?;
let neighbor_seq: Option<i64> = neighbor_row.try_get("seq").map_err(map_sql_err)?;
let list_meta = fetch_list_meta(&mut tx, key)
.await?
.ok_or_else(RespError::internal)?;
let new_seq = if let Some(neighbor) = neighbor_seq {
let gap = (pivot_seq - neighbor).abs();
if gap > 1 {
if before {
neighbor + (gap / 2)
} else {
pivot_seq + (gap / 2)
}
} else {
reindex_list(&mut tx, key, list_meta.len).await?;
let updated = fetch_list_meta(&mut tx, key)
.await?
.ok_or_else(RespError::internal)?;
if before {
updated.head - 1
} else {
updated.tail + 1
}
}
} else if before {
list_meta.head - 1
} else {
list_meta.tail + 1
};
sqlx::query("INSERT INTO redis_list (r_key, seq, l_value) VALUES (?, ?, ?)")
.bind(key.as_ref())
.bind(new_seq)
.bind(value.as_ref())
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
let new_head = list_meta.head.min(new_seq);
let new_tail = list_meta.tail.max(new_seq);
let new_len = list_meta.len + 1;
upsert_list_meta(&mut tx, key, new_head, new_tail, new_len).await?;
tx.commit().await.map_err(map_sql_err)?;
Ok(Value::Integer(new_len))
}
pub async fn rpoplpush(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 2 {
return Err(wrong_arity("RPOPLPUSH"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let source = arg_as_bytes(&cmd.args[0])?;
let dest = arg_as_bytes(&cmd.args[1])?;
let now = now_ms() as i64;
let src_meta = load_meta(auth.pool.as_ref(), source, now).await?;
let Some(src_meta) = src_meta else {
return Ok(Value::Null);
};
if src_meta.r_type != TYPE_LIST {
return Ok(wrong_type());
}
let dest_meta = load_meta(auth.pool.as_ref(), dest, now).await?;
if let Some(meta) = dest_meta
&& meta.r_type != TYPE_LIST
{
return Ok(wrong_type());
}
let mut tx = auth.pool.begin().await.map_err(map_sql_err)?;
let src_state = fetch_list_meta(&mut tx, source).await?;
let Some(src_state) = src_state else {
tx.commit().await.map_err(map_sql_err)?;
return Ok(Value::Null);
};
if src_state.len <= 0 {
tx.commit().await.map_err(map_sql_err)?;
return Ok(Value::Null);
}
let row = sqlx::query("SELECT l_value FROM redis_list WHERE r_key = ? AND seq = ?")
.bind(source.as_ref())
.bind(src_state.tail)
.fetch_optional(&mut *tx)
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
tx.commit().await.map_err(map_sql_err)?;
return Ok(Value::Null);
};
let value: Option<Vec<u8>> = row.try_get("l_value").map_err(map_sql_err)?;
let value = value.unwrap_or_default();
sqlx::query("DELETE FROM redis_list WHERE r_key = ? AND seq = ?")
.bind(source.as_ref())
.bind(src_state.tail)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
let new_src_len = src_state.len - 1;
if new_src_len <= 0 {
delete_list(&mut tx, source).await?;
} else {
upsert_list_meta(
&mut tx,
source,
src_state.head,
src_state.tail - 1,
new_src_len,
)
.await?;
}
let dest_state = fetch_list_meta(&mut tx, dest).await?;
let mut dest_head = 0;
let mut dest_tail = -1;
let mut dest_len = 0;
if let Some(meta) = dest_state {
dest_head = meta.head;
dest_tail = meta.tail;
dest_len = meta.len;
} else {
sqlx::query(
"INSERT INTO redis_kv (r_key, r_type, r_len, expires_at_ms) VALUES (?, ?, 0, NULL)",
)
.bind(dest.as_ref())
.bind(TYPE_LIST)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
}
dest_head -= 1;
sqlx::query("INSERT INTO redis_list (r_key, seq, l_value) VALUES (?, ?, ?)")
.bind(dest.as_ref())
.bind(dest_head)
.bind(&value)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
if dest_len == 0 {
dest_tail = dest_head;
}
dest_len += 1;
upsert_list_meta(&mut tx, dest, dest_head, dest_tail, dest_len).await?;
tx.commit().await.map_err(map_sql_err)?;
Ok(Value::Bulk(Bytes::from(value)))
}
pub async fn blpop(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
blocking_pop(cmd, state, session, true).await
}
pub async fn brpop(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
blocking_pop(cmd, state, session, false).await
}
async fn blocking_pop(
cmd: resp_async::Command,
state: Arc<AppState>,
session: Arc<crate::state::Session>,
left: bool,
) -> Result<Value, RespError> {
if cmd.args.len() < 2 {
return Err(wrong_arity(if left { "BLPOP" } else { "BRPOP" }));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let timeout = arg_as_f64(cmd.args.last().unwrap())?;
if timeout < 0.0 {
return Err(RespError::invalid_data("ERR timeout is negative"));
}
let keys = cmd
.args
.iter()
.take(cmd.args.len() - 1)
.map(|arg| arg_as_bytes(arg).cloned())
.collect::<Result<Vec<_>, _>>()?;
let deadline = if timeout == 0.0 {
None
} else {
Some(Instant::now() + Duration::from_secs_f64(timeout))
};
loop {
for key in &keys {
match try_pop(auth.pool.as_ref(), key, left).await? {
PopResult::Value(value) => {
return Ok(Value::Array(vec![
Value::Bulk(key.clone()),
Value::Bulk(value),
]));
}
PopResult::WrongType => return Ok(wrong_type()),
PopResult::Empty => {}
}
}
if let Some(deadline) = deadline
&& Instant::now() >= deadline
{
return Ok(Value::Null);
}
time::sleep(state.config.pubsub.poll_interval).await;
}
}
pub async fn brpoplpush(
Cmd(cmd): Cmd,
State(state): State<AppState>,
SessionHandle(session): SessionHandle,
) -> Result<Value, RespError> {
if cmd.args.len() != 3 {
return Err(wrong_arity("BRPOPLPUSH"));
}
let auth = session.auth().await.ok_or(RespError::NoAuth)?;
state.pools.touch(&auth.user);
let source = arg_as_bytes(&cmd.args[0])?;
let dest = arg_as_bytes(&cmd.args[1])?;
let timeout = arg_as_f64(&cmd.args[2])?;
if timeout < 0.0 {
return Err(RespError::invalid_data("ERR timeout is negative"));
}
let deadline = if timeout == 0.0 {
None
} else {
Some(Instant::now() + Duration::from_secs_f64(timeout))
};
loop {
let cmd = resp_async::Command {
name: Bytes::from_static(b"RPOPLPUSH"),
name_upper: Bytes::from_static(b"RPOPLPUSH"),
args: vec![Value::Bulk(source.clone()), Value::Bulk(dest.clone())],
};
let value = rpoplpush(
Cmd(cmd),
State(state.clone()),
SessionHandle(session.clone()),
)
.await?;
if !matches!(value, Value::Null) {
return Ok(value);
}
if let Some(deadline) = deadline
&& Instant::now() >= deadline
{
return Ok(Value::Null);
}
time::sleep(state.config.pubsub.poll_interval).await;
}
}
enum PopResult {
Value(Bytes),
Empty,
WrongType,
}
async fn try_pop(pool: &sqlx::MySqlPool, key: &Bytes, left: bool) -> Result<PopResult, RespError> {
let now = now_ms() as i64;
let meta = load_meta(pool, key, now).await?;
let Some(meta) = meta else {
return Ok(PopResult::Empty);
};
if meta.r_type != TYPE_LIST {
return Ok(PopResult::WrongType);
}
let mut tx = pool.begin().await.map_err(map_sql_err)?;
let list_meta = fetch_list_meta(&mut tx, key).await?;
let Some(list_meta) = list_meta else {
tx.commit().await.map_err(map_sql_err)?;
return Ok(PopResult::Empty);
};
if list_meta.len <= 0 {
tx.commit().await.map_err(map_sql_err)?;
return Ok(PopResult::Empty);
}
let seq = if left { list_meta.head } else { list_meta.tail };
let row = sqlx::query("SELECT l_value FROM redis_list WHERE r_key = ? AND seq = ?")
.bind(key.as_ref())
.bind(seq)
.fetch_optional(&mut *tx)
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
tx.commit().await.map_err(map_sql_err)?;
return Ok(PopResult::Empty);
};
let value: Option<Vec<u8>> = row.try_get("l_value").map_err(map_sql_err)?;
sqlx::query("DELETE FROM redis_list WHERE r_key = ? AND seq = ?")
.bind(key.as_ref())
.bind(seq)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
let new_len = list_meta.len - 1;
if new_len <= 0 {
delete_list(&mut tx, key).await?;
} else {
let new_head = if left {
list_meta.head + 1
} else {
list_meta.head
};
let new_tail = if left {
list_meta.tail
} else {
list_meta.tail - 1
};
upsert_list_meta(&mut tx, key, new_head, new_tail, new_len).await?;
}
tx.commit().await.map_err(map_sql_err)?;
Ok(PopResult::Value(Bytes::from(value.unwrap_or_default())))
}
async fn fetch_list_meta(
tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
key: &Bytes,
) -> Result<Option<ListMeta>, RespError> {
let row = sqlx::query("SELECT head_seq, tail_seq, len FROM redis_list_meta WHERE r_key = ?")
.bind(key.as_ref())
.fetch_optional(&mut **tx)
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Ok(None);
};
Ok(Some(ListMeta {
head: row.try_get("head_seq").map_err(map_sql_err)?,
tail: row.try_get("tail_seq").map_err(map_sql_err)?,
len: row.try_get("len").map_err(map_sql_err)?,
}))
}
async fn fetch_list_meta_pool(
pool: &sqlx::MySqlPool,
key: &Bytes,
) -> Result<Option<ListMeta>, RespError> {
let row = sqlx::query("SELECT head_seq, tail_seq, len FROM redis_list_meta WHERE r_key = ?")
.bind(key.as_ref())
.fetch_optional(pool)
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Ok(None);
};
Ok(Some(ListMeta {
head: row.try_get("head_seq").map_err(map_sql_err)?,
tail: row.try_get("tail_seq").map_err(map_sql_err)?,
len: row.try_get("len").map_err(map_sql_err)?,
}))
}
async fn upsert_list_meta(
tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
key: &Bytes,
head: i64,
tail: i64,
len: i64,
) -> Result<(), RespError> {
sqlx::query(
"INSERT INTO redis_list_meta (r_key, head_seq, tail_seq, len) \
VALUES (?, ?, ?, ?) \
ON DUPLICATE KEY UPDATE head_seq = VALUES(head_seq), tail_seq = VALUES(tail_seq), len = VALUES(len)",
)
.bind(key.as_ref())
.bind(head)
.bind(tail)
.bind(len)
.execute(&mut **tx)
.await
.map_err(map_sql_err)?;
Ok(())
}
async fn delete_list(
tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
key: &Bytes,
) -> Result<(), RespError> {
sqlx::query("DELETE FROM redis_list WHERE r_key = ?")
.bind(key.as_ref())
.execute(&mut **tx)
.await
.map_err(map_sql_err)?;
sqlx::query("DELETE FROM redis_list_meta WHERE r_key = ?")
.bind(key.as_ref())
.execute(&mut **tx)
.await
.map_err(map_sql_err)?;
sqlx::query("DELETE FROM redis_kv WHERE r_key = ?")
.bind(key.as_ref())
.execute(&mut **tx)
.await
.map_err(map_sql_err)?;
Ok(())
}
async fn reindex_list(
tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
key: &Bytes,
len: i64,
) -> Result<(), RespError> {
let rows = sqlx::query("SELECT l_value FROM redis_list WHERE r_key = ? ORDER BY seq")
.bind(key.as_ref())
.fetch_all(&mut **tx)
.await
.map_err(map_sql_err)?;
let mut values = Vec::with_capacity(rows.len());
for row in rows {
let value: Option<Vec<u8>> = row.try_get("l_value").map_err(map_sql_err)?;
values.push(value.unwrap_or_default());
}
sqlx::query("DELETE FROM redis_list WHERE r_key = ?")
.bind(key.as_ref())
.execute(&mut **tx)
.await
.map_err(map_sql_err)?;
let mut seq = 0i64;
for value in values {
sqlx::query("INSERT INTO redis_list (r_key, seq, l_value) VALUES (?, ?, ?)")
.bind(key.as_ref())
.bind(seq)
.bind(value)
.execute(&mut **tx)
.await
.map_err(map_sql_err)?;
seq += 2;
}
if len > 0 {
upsert_list_meta(tx, key, 0, (len - 1) * 2, len).await?;
}
Ok(())
}
fn normalize_range(start: i64, stop: i64, len: i64) -> Option<(i64, i64)> {
if len <= 0 {
return None;
}
let mut start = if start < 0 { len + start } else { start };
let mut stop = if stop < 0 { len + stop } else { stop };
if start < 0 {
start = 0;
}
if stop >= len {
stop = len - 1;
}
if start > stop || start >= len {
return None;
}
let count = stop - start + 1;
Some((start, count))
}