use crate::Commands;
use crate::blocked::{BlockKind, encode_block_timeout, unix_now_ms};
use crate::message::Inbound;
use crate::reduce::shard_of;
use crate::shard::Shard;
use kevy_resp::{Argv, ArgvView, RespVersion};
use std::collections::HashMap;
pub(crate) struct OriginBlock {
pub(crate) kind: BlockKind,
pub(crate) deadline_ms: u64,
pub(crate) proto: RespVersion,
pub(crate) serving: bool,
pub(crate) keys: Vec<OriginKey>,
}
pub(crate) struct OriginKey {
pub(crate) key: Vec<u8>,
pub(crate) shard: usize,
pub(crate) serve_argv: Argv,
}
struct XWaiter {
origin: usize,
conn: u64,
kind: BlockKind,
serve_argv: Argv,
proto: RespVersion,
}
#[derive(Default)]
pub(crate) struct XShardWaiters {
by_key: HashMap<Vec<u8>, Vec<XWaiter>>,
by_conn: HashMap<(usize, u64), Vec<Vec<u8>>>,
}
impl XShardWaiters {
#[inline]
pub(crate) fn is_empty(&self) -> bool {
self.by_key.is_empty()
}
#[inline]
pub(crate) fn is_watched(&self, key: &[u8]) -> bool {
self.by_key.contains_key(key)
}
fn arm(&mut self, key: &[u8], w: XWaiter) {
let id = (w.origin, w.conn);
let q = self.by_key.entry(key.to_vec()).or_default();
if let Some(slot) = q.iter_mut().find(|e| (e.origin, e.conn) == id) {
slot.serve_argv = w.serve_argv;
slot.proto = w.proto;
slot.kind = w.kind;
} else {
q.push(w);
self.by_conn.entry(id).or_default().push(key.to_vec());
}
}
fn waiters_on(&self, key: &[u8]) -> Vec<(usize, u64)> {
self.by_key
.get(key)
.map(|q| q.iter().map(|w| (w.origin, w.conn)).collect())
.unwrap_or_default()
}
fn serve_argv(&self, key: &[u8], origin: usize, conn: u64) -> Option<(Argv, RespVersion)> {
self.by_key.get(key).and_then(|q| {
q.iter()
.find(|w| w.origin == origin && w.conn == conn)
.map(|w| (w.serve_argv.clone(), w.proto))
})
}
fn drop_for(&mut self, origin: usize, conn: u64) {
let Some(keys) = self.by_conn.remove(&(origin, conn)) else {
return;
};
for key in keys {
if let Some(q) = self.by_key.get_mut(&key) {
q.retain(|w| !(w.origin == origin && w.conn == conn));
if q.is_empty() {
self.by_key.remove(&key);
}
}
}
}
}
impl<C: Commands> Shard<C> {
pub(crate) fn park_blocked_xshard(
&mut self,
conn_id: u64,
kind: BlockKind,
entries: Vec<(Vec<u8>, Argv)>,
deadline_ms: u64,
proto: RespVersion,
) {
let keys: Vec<OriginKey> = entries
.into_iter()
.map(|(key, serve_argv)| OriginKey {
shard: shard_of(&key, self.nshards),
key,
serve_argv,
})
.collect();
if let Some(conn) = self.conns.get_mut(&conn_id) {
conn.blocked = true;
}
let arms: Vec<(usize, Vec<u8>, Argv)> = keys
.iter()
.map(|k| (k.shard, k.key.clone(), k.serve_argv.clone()))
.collect();
self.origin_blocks.insert(
conn_id,
OriginBlock { kind, deadline_ms, proto, serving: false, keys },
);
self.arm_and_maybe_serve(conn_id, kind, proto, arms);
}
fn arm_and_maybe_serve(
&mut self,
conn: u64,
kind: BlockKind,
proto: RespVersion,
arms: Vec<(usize, Vec<u8>, Argv)>,
) {
let mut local_ready: Vec<Vec<u8>> = Vec::new();
for (shard, key, serve_argv) in arms {
if shard == self.id {
if self.target_register(self.id, conn, &key, kind, serve_argv, proto) {
local_ready.push(key);
}
} else {
self.send_to(
shard,
Inbound::BlockArm { origin: self.id, conn, key, kind, serve_argv, proto },
);
}
}
for key in local_ready {
if !self.origin_blocks.contains_key(&conn) {
break;
}
self.origin_on_ready(conn, &key);
}
}
pub(crate) fn origin_on_ready(&mut self, conn: u64, key: &[u8]) {
let Some(ob) = self.origin_blocks.get_mut(&conn) else {
return;
};
if ob.serving {
return;
}
let Some(shard) = ob.keys.iter().find(|k| k.key == key).map(|k| k.shard) else {
return; };
ob.serving = true;
if shard == self.id {
let reply = self.target_serve(self.id, conn, key);
self.origin_on_serve_resp(conn, key.to_vec(), reply);
} else {
self.send_to(
shard,
Inbound::BlockServeReq {
origin: self.id,
conn,
key: key.to_vec(),
},
);
}
}
pub(crate) fn origin_on_serve_resp(&mut self, conn: u64, _key: Vec<u8>, reply: Vec<u8>) {
let Some(ob) = self.origin_blocks.get_mut(&conn) else {
return; };
if reply.is_empty() {
ob.serving = false;
self.rearm_all(conn);
return;
}
self.deliver_block(conn, reply);
}
fn deliver_block(&mut self, conn: u64, reply: Vec<u8>) {
if let Some(c) = self.conns.get_mut(&conn) {
c.blocked = false;
c.output.extend_from_slice(&reply);
c.next_emit += 1;
self.dirty.push(conn);
}
if let Some(ob) = self.origin_blocks.remove(&conn) {
self.broadcast_cancel(conn, &ob.keys);
}
}
fn rearm_all(&mut self, conn: u64) {
let Some(ob) = self.origin_blocks.get(&conn) else {
return;
};
let proto = ob.proto;
let kind = ob.kind;
let arms: Vec<(usize, Vec<u8>, Argv)> = ob
.keys
.iter()
.map(|k| (k.shard, k.key.clone(), k.serve_argv.clone()))
.collect();
self.arm_and_maybe_serve(conn, kind, proto, arms);
}
fn broadcast_cancel(&mut self, conn: u64, keys: &[OriginKey]) {
let mut seen: Vec<usize> = Vec::new();
for k in keys {
if seen.contains(&k.shard) {
continue;
}
seen.push(k.shard);
if k.shard == self.id {
self.xwaiters.drop_for(self.id, conn);
} else {
self.send_to(k.shard, Inbound::BlockCancel { origin: self.id, conn });
}
}
}
pub(crate) fn tick_xshard_timeouts(&mut self) {
if self.origin_blocks.is_empty() {
return;
}
let now = unix_now_ms();
let expired: Vec<u64> = self
.origin_blocks
.iter()
.filter(|(_, ob)| !ob.serving && ob.deadline_ms <= now)
.map(|(&c, _)| c)
.collect();
for conn in expired {
let Some(ob) = self.origin_blocks.remove(&conn) else {
continue;
};
if let Some(c) = self.conns.get_mut(&conn) {
c.blocked = false;
encode_block_timeout(&mut c.output, ob.kind, ob.proto);
c.next_emit += 1;
self.dirty.push(conn);
}
self.broadcast_cancel(conn, &ob.keys);
}
}
pub(crate) fn cancel_xshard_on_close(&mut self, conn: u64) {
if let Some(ob) = self.origin_blocks.remove(&conn) {
self.broadcast_cancel(conn, &ob.keys);
}
}
pub(crate) fn target_arm(
&mut self,
origin: usize,
conn: u64,
key: Vec<u8>,
kind: BlockKind,
serve_argv: Argv,
proto: RespVersion,
) {
if self.target_register(origin, conn, &key, kind, serve_argv, proto) {
self.signal_ready(origin, conn, &key);
}
}
fn target_register(
&mut self,
origin: usize,
conn: u64,
key: &[u8],
kind: BlockKind,
serve_argv: Argv,
proto: RespVersion,
) -> bool {
let frozen = self
.commands
.resolve_block_argv(&mut self.store, &serve_argv, kind);
let ready = self.commands.block_ready(&mut self.store, &frozen, kind);
self.xwaiters.arm(
key,
XWaiter {
origin,
conn,
kind,
serve_argv: frozen,
proto,
},
);
ready
}
pub(crate) fn target_wake_xshard(&mut self, key: &[u8]) {
for (origin, conn) in self.xwaiters.waiters_on(key) {
self.signal_ready(origin, conn, key);
}
}
fn signal_ready(&mut self, origin: usize, conn: u64, key: &[u8]) {
if origin == self.id {
self.origin_on_ready(conn, key);
} else {
self.send_to(origin, Inbound::BlockReady { conn, key: key.to_vec() });
}
}
pub(crate) fn target_serve(&mut self, origin: usize, conn: u64, key: &[u8]) -> Vec<u8> {
let Some((argv, proto)) = self.xwaiters.serve_argv(key, origin, conn) else {
return Vec::new();
};
let mut reply = Vec::new();
match proto {
RespVersion::V2 => self.commands.dispatch_into(&mut self.store, &argv, &mut reply),
RespVersion::V3 => self
.commands
.dispatch_into_resp3(&mut self.store, &argv, &mut reply),
}
reply
}
pub(crate) fn target_cancel(&mut self, origin: usize, conn: u64) {
self.xwaiters.drop_for(origin, conn);
}
}
pub(crate) fn build_serve_entries<C: Commands, A: ArgvView + ?Sized>(
commands: &C,
args: &A,
kind: BlockKind,
keys: &[Vec<u8>],
) -> Vec<(Vec<u8>, Argv)> {
keys.iter()
.map(|k| (k.clone(), commands.block_serve_argv(args, kind, k)))
.collect()
}