use crate::Commands;
use crate::Route;
use crate::message::{Agg, GatherKind, KeyShape, KvPairs, MultiOp, Op};
use crate::reduce::shard_of;
use crate::shard::Shard;
use kevy_resp::ArgvView;
use std::collections::HashMap;
impl<C: Commands> Shard<C> {
pub(crate) fn build_multi_targets<A: ArgvView + ?Sized>(
&self,
args: &A,
route: Route,
) -> (Vec<(usize, Op)>, Agg) {
match route {
Route::Local | Route::Single(_) => {
eprintln!(
"kevy WARN: build_multi_targets reached single-target route {route:?} \
— routing bug; replying nil to the client"
);
(Vec::new(), Agg::First(None))
}
Route::Subscribe
| Route::Unsubscribe
| Route::Psubscribe
| Route::Punsubscribe
| Route::Publish
| Route::Watch
| Route::Unwatch
| Route::Hello
| Route::Rename { .. }
| Route::Slowlog(_) => {
eprintln!(
"kevy WARN: build_multi_targets reached conn-level route {route:?} \
— routing bug; replying nil to the client"
);
(Vec::new(), Agg::First(None))
}
Route::DelKeys => (self.group_keys(args, Op::Del), Agg::SumInt(0)),
Route::ExistsKeys => (self.group_keys(args, Op::Exists), Agg::SumInt(0)),
Route::Dbsize => (
(0..self.nshards).map(|s| (s, Op::Dbsize)).collect(),
Agg::SumInt(0),
),
Route::Flush => (
(0..self.nshards).map(|s| (s, Op::Flush)).collect(),
Agg::AllOk,
),
Route::Save => (
(0..self.nshards).map(|s| (s, Op::Save)).collect(),
Agg::AllOk,
),
Route::RewriteAof => (
(0..self.nshards).map(|s| (s, Op::RewriteAof)).collect(),
Agg::AllOk,
),
Route::MSet => self.build_mset_targets(args),
Route::MGet => self.build_gather(args, GatherKind::Str, MultiOp::Mget),
Route::SInter => self.build_gather(args, GatherKind::Set, MultiOp::SInter),
Route::SUnion => self.build_gather(args, GatherKind::Set, MultiOp::SUnion),
Route::SDiff => self.build_gather(args, GatherKind::Set, MultiOp::SDiff),
Route::Keys(pat) => self.fanout_keys(pat, None, KeyShape::Keys),
Route::Scan(pat) => self.fanout_keys(pat, None, KeyShape::Scan),
Route::RandomKey => self.fanout_keys(None, Some(1), KeyShape::Random),
}
}
fn build_mset_targets<A: ArgvView + ?Sized>(
&self,
args: &A,
) -> (Vec<(usize, Op)>, Agg) {
let mut by_shard: HashMap<usize, KvPairs> = HashMap::new();
let mut i = 1;
while i + 1 < args.len() {
by_shard
.entry(shard_of(&args[i], self.nshards))
.or_default()
.push((args[i].to_vec(), args[i + 1].to_vec()));
i += 2;
}
(
by_shard
.into_iter()
.map(|(s, p)| (s, Op::MSet(p)))
.collect(),
Agg::AllOk,
)
}
fn build_gather<A: ArgvView + ?Sized>(
&self,
args: &A,
kind: GatherKind,
op: MultiOp,
) -> (Vec<(usize, Op)>, Agg) {
let keys: Vec<Vec<u8>> = (1..args.len()).map(|i| args[i].to_vec()).collect();
let mut by_shard: HashMap<usize, Vec<Vec<u8>>> = HashMap::new();
for k in &keys {
by_shard
.entry(shard_of(k, self.nshards))
.or_default()
.push(k.clone());
}
let targets = by_shard
.into_iter()
.map(|(s, ks)| (s, Op::Gather(kind, ks)))
.collect();
(
targets,
Agg::Gather {
op,
keys,
got: HashMap::new(),
},
)
}
fn fanout_keys(
&self,
pat: Option<Vec<u8>>,
limit: Option<usize>,
shape: KeyShape,
) -> (Vec<(usize, Op)>, Agg) {
let targets = (0..self.nshards)
.map(|s| (s, Op::CollectKeys(pat.clone(), limit)))
.collect();
(
targets,
Agg::Keys {
shape,
acc: Vec::new(),
},
)
}
pub(crate) fn group_keys<A: ArgvView + ?Sized>(
&self,
args: &A,
mk: fn(Vec<Vec<u8>>) -> Op,
) -> Vec<(usize, Op)> {
let mut by_shard: HashMap<usize, Vec<Vec<u8>>> = HashMap::new();
for i in 1..args.len() {
let key = &args[i];
by_shard
.entry(shard_of(key, self.nshards))
.or_default()
.push(key.to_vec());
}
by_shard
.into_iter()
.map(|(s, keys)| (s, mk(keys)))
.collect()
}
}