use crate::Commands;
use crate::Route;
use crate::message::{Agg, GatherKind, KeyShape, KvPairs, MultiOp, Op};
use crate::reduce::shard_of;
use crate::shard::Shard;
use kevy_resp::{Argv, ArgvView};
use std::collections::HashMap;
impl<C: Commands> Shard<C> {
pub(crate) fn build_multi_targets<A: ArgvView + ?Sized>(
&self,
args: &A,
route: Route,
) -> (Vec<(usize, Op)>, Agg) {
match route {
Route::Local | Route::Single(_) => {
eprintln!(
"kevy WARN: build_multi_targets reached single-target route {route:?} \
— routing bug; replying nil to the client"
);
(Vec::new(), Agg::First(None))
}
Route::Subscribe
| Route::Unsubscribe
| Route::Psubscribe
| Route::Punsubscribe
| Route::Publish
| Route::Watch
| Route::Unwatch
| Route::Hello
| Route::Rename { .. }
| Route::Slowlog(_) => {
eprintln!(
"kevy WARN: build_multi_targets reached conn-level route {route:?} \
— routing bug; replying nil to the client"
);
(Vec::new(), Agg::First(None))
}
Route::DelKeys => (self.group_keys(args, Op::Del), Agg::SumInt(0)),
Route::ExistsKeys => (self.group_keys(args, Op::Exists), Agg::SumInt(0)),
Route::Dbsize => (
(0..self.nshards).map(|s| (s, Op::Dbsize)).collect(),
Agg::SumInt(0),
),
Route::Flush => (
(0..self.nshards).map(|s| (s, Op::Flush)).collect(),
Agg::AllOk,
),
Route::Save => (
(0..self.nshards).map(|s| (s, Op::Save)).collect(),
Agg::AllOk,
),
Route::RewriteAof => (
(0..self.nshards).map(|s| (s, Op::RewriteAof)).collect(),
Agg::AllOk,
),
Route::MSet => self.build_mset_targets(args),
Route::MGet => self.build_gather(args, GatherKind::Str, MultiOp::Mget),
Route::SInter => self.build_gather(args, GatherKind::Set, MultiOp::SInter),
Route::SUnion => self.build_gather(args, GatherKind::Set, MultiOp::SUnion),
Route::SDiff => self.build_gather(args, GatherKind::Set, MultiOp::SDiff),
Route::Keys(pat) => self.fanout_keys(pat, None, KeyShape::Keys),
Route::Scan(pat) => self.fanout_keys(pat, None, KeyShape::Scan),
Route::RandomKey => self.fanout_keys(None, Some(1), KeyShape::Random),
Route::XReadGather { streams, count } => self.build_xread_targets(streams, count),
}
}
fn build_xread_targets(
&self,
streams: Vec<(Vec<u8>, Vec<u8>)>,
count: Option<usize>,
) -> (Vec<(usize, Op)>, Agg) {
let n = streams.len();
let count_bytes = count.map(|c| c.to_string().into_bytes());
let targets = streams
.into_iter()
.enumerate()
.map(|(i, (key, cursor))| {
let shard = shard_of(&key, self.nshards);
let mut argv = Argv::default();
argv.push(b"XREAD");
if let Some(cb) = &count_bytes {
argv.push(b"COUNT");
argv.push(cb);
}
argv.push(b"STREAMS");
argv.push(&key);
argv.push(&cursor);
(shard, Op::XReadOne { index: i as u32, argv })
})
.collect();
(targets, Agg::XReadGather { slots: vec![None; n] })
}
fn build_mset_targets<A: ArgvView + ?Sized>(
&self,
args: &A,
) -> (Vec<(usize, Op)>, Agg) {
let mut by_shard: HashMap<usize, KvPairs> = HashMap::new();
let mut i = 1;
while i + 1 < args.len() {
by_shard
.entry(shard_of(&args[i], self.nshards))
.or_default()
.push((args[i].to_vec(), args[i + 1].to_vec()));
i += 2;
}
(
by_shard
.into_iter()
.map(|(s, p)| (s, Op::MSet(p)))
.collect(),
Agg::AllOk,
)
}
fn build_gather<A: ArgvView + ?Sized>(
&self,
args: &A,
kind: GatherKind,
op: MultiOp,
) -> (Vec<(usize, Op)>, Agg) {
let keys: Vec<Vec<u8>> = (1..args.len()).map(|i| args[i].to_vec()).collect();
let mut by_shard: HashMap<usize, Vec<Vec<u8>>> = HashMap::new();
for k in &keys {
by_shard
.entry(shard_of(k, self.nshards))
.or_default()
.push(k.clone());
}
let targets = by_shard
.into_iter()
.map(|(s, ks)| (s, Op::Gather(kind, ks)))
.collect();
(
targets,
Agg::Gather {
op,
keys,
got: HashMap::new(),
},
)
}
fn fanout_keys(
&self,
pat: Option<Vec<u8>>,
limit: Option<usize>,
shape: KeyShape,
) -> (Vec<(usize, Op)>, Agg) {
let targets = (0..self.nshards)
.map(|s| (s, Op::CollectKeys(pat.clone(), limit)))
.collect();
(
targets,
Agg::Keys {
shape,
acc: Vec::new(),
},
)
}
pub(crate) fn group_keys<A: ArgvView + ?Sized>(
&self,
args: &A,
mk: fn(Vec<Vec<u8>>) -> Op,
) -> Vec<(usize, Op)> {
let mut by_shard: HashMap<usize, Vec<Vec<u8>>> = HashMap::new();
for i in 1..args.len() {
let key = &args[i];
by_shard
.entry(shard_of(key, self.nshards))
.or_default()
.push(key.to_vec());
}
by_shard
.into_iter()
.map(|(s, keys)| (s, mk(keys)))
.collect()
}
}