use crate::conn::Conn;
use crate::message::{Agg, Gathered, KeyShape, MultiOp};
use kevy_hash::KevyHash;
use kevy_resp::{
encode_array_len, encode_bulk, encode_error, encode_integer, encode_null_bulk,
encode_simple_string,
};
use std::collections::{HashMap, HashSet};
const WRONGTYPE: &str = "WRONGTYPE Operation against a key holding the wrong kind of value";
pub(crate) fn materialize(agg: Agg) -> Vec<u8> {
match agg {
Agg::First(Some(b)) => b,
Agg::First(None) => {
let mut out = Vec::new();
encode_error(&mut out, "ERR internal error");
out
}
Agg::SumInt(n) => {
let mut out = Vec::new();
encode_integer(&mut out, n);
out
}
Agg::AllOk => {
let mut out = Vec::new();
encode_simple_string(&mut out, "OK");
out
}
Agg::Gather { op, keys, got } => finalize_gather(op, keys, got),
Agg::Keys { shape, acc } => finalize_keys(shape, acc),
}
}
fn finalize_keys(shape: KeyShape, acc: Vec<Vec<u8>>) -> Vec<u8> {
let mut out = Vec::new();
match shape {
KeyShape::Keys => {
encode_array_len(&mut out, acc.len() as i64);
for k in &acc {
encode_bulk(&mut out, k);
}
}
KeyShape::Scan => {
encode_array_len(&mut out, 2);
encode_bulk(&mut out, b"0");
encode_array_len(&mut out, acc.len() as i64);
for k in &acc {
encode_bulk(&mut out, k);
}
}
KeyShape::Random => match acc.first() {
Some(k) => encode_bulk(&mut out, k),
None => encode_null_bulk(&mut out),
},
}
out
}
fn finalize_gather(op: MultiOp, keys: Vec<Vec<u8>>, got: HashMap<Vec<u8>, Gathered>) -> Vec<u8> {
let mut out = Vec::new();
match op {
MultiOp::Mget => {
encode_array_len(&mut out, keys.len() as i64);
for k in &keys {
match got.get(k) {
Some(Gathered::Str(Some(v))) => encode_bulk(&mut out, v),
_ => encode_null_bulk(&mut out), }
}
}
_ => {
let mut sets: Vec<Vec<Vec<u8>>> = Vec::with_capacity(keys.len());
for k in &keys {
match got.get(k) {
Some(Gathered::Members(m)) => sets.push(m.clone()),
Some(Gathered::WrongType) => {
encode_error(&mut out, WRONGTYPE);
return out;
}
_ => sets.push(Vec::new()), }
}
let result = match op {
MultiOp::SInter => set_intersect(&sets),
MultiOp::SUnion => set_union(&sets),
MultiOp::SDiff => set_diff(&sets),
MultiOp::Mget => unreachable!(),
};
encode_array_len(&mut out, result.len() as i64);
for m in &result {
encode_bulk(&mut out, m);
}
}
}
out
}
pub(crate) fn pubsub_message(channel: &[u8], msg: &[u8]) -> Vec<u8> {
let mut out = Vec::new();
encode_array_len(&mut out, 3);
encode_bulk(&mut out, b"message");
encode_bulk(&mut out, channel);
encode_bulk(&mut out, msg);
out
}
fn set_intersect(sets: &[Vec<Vec<u8>>]) -> Vec<Vec<u8>> {
let Some((first, rest)) = sets.split_first() else {
return Vec::new();
};
let mut acc: HashSet<&Vec<u8>> = first.iter().collect();
for s in rest {
let other: HashSet<&Vec<u8>> = s.iter().collect();
acc.retain(|m| other.contains(*m));
}
acc.into_iter().cloned().collect()
}
fn set_union(sets: &[Vec<Vec<u8>>]) -> Vec<Vec<u8>> {
let mut acc: HashSet<&Vec<u8>> = HashSet::new();
for s in sets {
for m in s {
acc.insert(m);
}
}
acc.into_iter().cloned().collect()
}
fn set_diff(sets: &[Vec<Vec<u8>>]) -> Vec<Vec<u8>> {
let Some((first, rest)) = sets.split_first() else {
return Vec::new();
};
let mut acc: HashSet<&Vec<u8>> = first.iter().collect();
for s in rest {
for m in s {
acc.remove(m);
}
}
acc.into_iter().cloned().collect()
}
pub(crate) fn drain_front(conn: &mut Conn) {
while matches!(conn.pending.front(), Some(s) if s.done.is_some()) {
let slot = conn.pending.pop_front().unwrap();
if let Some(bytes) = slot.done {
conn.output.extend_from_slice(&bytes);
}
conn.next_emit += 1;
}
}
#[inline]
pub(crate) fn shard_of(key: &[u8], n: usize) -> usize {
if n == 1 {
return 0;
}
let h = key.kevy_hash();
if n.is_power_of_two() {
(h as usize) & (n - 1)
} else {
(h as usize) % n
}
}