#![expect(
dead_code,
reason = "stream BlockKind / BlockHint variants land in v2-7d.3 / .4"
)]
use crate::Commands;
use crate::shard::Shard;
use kevy_resp::{Argv, RespVersion};
use std::collections::{HashMap, VecDeque};
use std::time::{SystemTime, UNIX_EPOCH};
#[inline]
pub(crate) fn unix_now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
pub(crate) fn encode_block_timeout(out: &mut Vec<u8>, kind: BlockKind, proto: RespVersion) {
match (proto, kind) {
(RespVersion::V3, _) => out.extend_from_slice(b"_\r\n"),
(RespVersion::V2, BlockKind::Blpop | BlockKind::Brpop) => {
out.extend_from_slice(b"*-1\r\n")
}
(RespVersion::V2, BlockKind::XReadBlock | BlockKind::XReadGroupBlock) => {
out.extend_from_slice(b"$-1\r\n")
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BlockKind {
Blpop,
Brpop,
XReadBlock,
XReadGroupBlock,
}
#[derive(Clone, Debug, Default)]
pub enum BlockHint {
#[default]
None,
Block {
kind: BlockKind,
key: Vec<u8>,
timeout_ms: u64,
},
}
pub(crate) struct BlockedClient {
pub(crate) conn_id: u64,
pub(crate) kind: BlockKind,
pub(crate) deadline_ms: u64,
pub(crate) argv: Argv,
pub(crate) proto: RespVersion,
}
#[derive(Default)]
pub(crate) struct BlockedClients {
by_key: HashMap<Vec<u8>, VecDeque<BlockedClient>>,
by_conn: HashMap<u64, Vec<Vec<u8>>>,
}
impl BlockedClients {
pub(crate) fn new() -> Self {
Self::default()
}
#[inline]
pub(crate) fn is_empty(&self) -> bool {
self.by_key.is_empty()
}
#[inline]
pub(crate) fn is_watched(&self, key: &[u8]) -> bool {
self.by_key.contains_key(key)
}
pub(crate) fn add(
&mut self,
conn_id: u64,
keys: &[Vec<u8>],
kind: BlockKind,
deadline_ms: u64,
argv: Argv,
proto: RespVersion,
) {
for key in keys {
let bc = BlockedClient {
conn_id,
kind,
deadline_ms,
argv: argv.clone(),
proto,
};
self.by_key.entry(key.clone()).or_default().push_back(bc);
}
self.by_conn.insert(conn_id, keys.to_vec());
}
pub(crate) fn pop_oldest_on_key(&mut self, key: &[u8]) -> Option<BlockedClient> {
let queue = self.by_key.get_mut(key)?;
let waiter = queue.pop_front();
if queue.is_empty() {
self.by_key.remove(key);
}
waiter
}
pub(crate) fn drop_for_conn(&mut self, conn_id: u64) {
let Some(keys) = self.by_conn.remove(&conn_id) else {
return;
};
for key in keys {
let Some(queue) = self.by_key.get_mut(&key) else {
continue;
};
queue.retain(|w| w.conn_id != conn_id);
if queue.is_empty() {
self.by_key.remove(&key);
}
}
}
pub(crate) fn pop_expired(&mut self, now_ms: u64) -> Vec<BlockedClient> {
let conns = self.expired_conn_ids(now_ms);
let mut out = Vec::with_capacity(conns.len());
for conn_id in conns {
if let Some(rep) = self.representative(conn_id) {
out.push(rep);
}
self.drop_for_conn(conn_id);
}
out
}
fn expired_conn_ids(&self, now_ms: u64) -> Vec<u64> {
let mut seen: Vec<u64> = Vec::new();
for queue in self.by_key.values() {
for w in queue {
if w.deadline_ms <= now_ms && !seen.contains(&w.conn_id) {
seen.push(w.conn_id);
}
}
}
seen
}
fn representative(&self, conn_id: u64) -> Option<BlockedClient> {
let keys = self.by_conn.get(&conn_id)?;
let first_key = keys.first()?;
let queue = self.by_key.get(first_key)?;
queue
.iter()
.find(|w| w.conn_id == conn_id)
.map(|w| BlockedClient {
conn_id: w.conn_id,
kind: w.kind,
deadline_ms: w.deadline_ms,
argv: w.argv.clone(),
proto: w.proto,
})
}
}
impl<C: Commands> Shard<C> {
pub(crate) fn tick_blocked_timeouts(&mut self) {
if self.blocked.is_empty() {
return;
}
let now_ms = unix_now_ms();
for w in self.blocked.pop_expired(now_ms) {
let Some(conn) = self.conns.get_mut(&w.conn_id) else {
continue;
};
conn.blocked = false;
encode_block_timeout(&mut conn.output, w.kind, w.proto);
self.dirty.push(w.conn_id);
}
}
pub(crate) fn wake_blocked_on_key(&mut self, key: &[u8]) {
if self.blocked.is_empty() {
return;
}
let Some(waiter) = self.blocked.pop_oldest_on_key(key) else {
return;
};
self.blocked.drop_for_conn(waiter.conn_id);
let Some(conn) = self.conns.get_mut(&waiter.conn_id) else {
return;
};
conn.blocked = false;
let proto = waiter.proto;
match proto {
RespVersion::V2 => self
.commands
.dispatch_into(&mut self.store, &waiter.argv, &mut conn.output),
RespVersion::V3 => self
.commands
.dispatch_into_resp3(&mut self.store, &waiter.argv, &mut conn.output),
}
conn.next_emit += 1;
self.dirty.push(waiter.conn_id);
}
}