#![allow(clippy::cast_possible_truncation)]
use crate::io::mbuf::{Mbuf, MbufPool};
use crate::msg::{KeyPos, Msg, MsgType};
use super::commands::memcache_retrieval;
pub trait FragmentDispatcher {
fn shard_for(&self, key_tag: &[u8]) -> u32;
fn shard_count(&self) -> u32;
}
#[derive(Debug)]
pub struct FragmentOutcome {
pub fragments: Vec<Msg>,
pub shard_for_key: Vec<u32>,
pub frag_id: u64,
}
static FRAG_ID_SEED: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
fn next_frag_id() -> u64 {
FRAG_ID_SEED.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
pub fn memcache_fragment<D: FragmentDispatcher + ?Sized>(
r: &mut Msg,
dispatcher: &D,
pool: &MbufPool,
) -> Result<Option<FragmentOutcome>, FragmentError> {
if !memcache_retrieval(r.ty()) {
return Ok(None);
}
if r.keys().is_empty() {
return Err(FragmentError::EmptyKeys);
}
let n_shards = dispatcher.shard_count() as usize;
let mut bucket: Vec<Option<usize>> = vec![None; n_shards.max(1)];
let mut fragments: Vec<Msg> = Vec::new();
let mut keys_per_fragment: Vec<Vec<KeyPos>> = Vec::new();
let mut shard_for_key = Vec::with_capacity(r.keys().len());
let frag_id = next_frag_id();
for key in r.keys() {
let idx = dispatcher.shard_for(key.tag_bytes()) as usize;
if idx >= bucket.len() {
bucket.resize(idx + 1, None);
}
let bucket_idx = if let Some(j) = bucket.get(idx).copied().flatten() {
j
} else {
let mut sub = Msg::new(0, r.ty(), true);
sub.set_frag_id(frag_id);
fragments.push(sub);
keys_per_fragment.push(Vec::new());
let j = fragments.len() - 1;
if let Some(slot) = bucket.get_mut(idx) {
*slot = Some(j);
}
j
};
if let Some(klist) = keys_per_fragment.get_mut(bucket_idx) {
klist.push(clone_keypos(key));
}
shard_for_key.push(u32::try_from(idx).unwrap_or(u32::MAX));
}
for (i, frag) in fragments.iter_mut().enumerate() {
let keys = keys_per_fragment.get(i).cloned().unwrap_or_default();
encode_fragment(frag, &keys, pool)?;
for k in &keys {
frag.push_key(clone_keypos(k));
}
let nkeys = u32::try_from(keys.len()).unwrap_or(u32::MAX);
frag.set_ntokens(1u32.saturating_add(nkeys));
}
r.set_frag_id(frag_id);
Ok(Some(FragmentOutcome {
fragments,
shard_for_key,
frag_id,
}))
}
fn clone_keypos(k: &KeyPos) -> KeyPos {
KeyPos::new(k.key().to_vec(), k.tag())
}
fn encode_fragment(frag: &mut Msg, keys: &[KeyPos], pool: &MbufPool) -> Result<(), FragmentError> {
let verb: &[u8] = match frag.ty() {
MsgType::ReqMcGet => b"get",
MsgType::ReqMcGets => b"gets",
_ => return Err(FragmentError::UnsupportedType),
};
let mut buf: Vec<u8> = Vec::new();
buf.extend_from_slice(verb);
for k in keys {
buf.push(b' ');
buf.extend_from_slice(k.key());
}
buf.extend_from_slice(b"\r\n");
write_buf_into_chain(frag, pool, &buf);
Ok(())
}
fn write_buf_into_chain(frag: &mut Msg, pool: &MbufPool, mut buf: &[u8]) {
while !buf.is_empty() {
let mut mb: Mbuf = pool.get();
let n = mb.recv(buf);
if n == 0 {
break;
}
frag.mbufs_mut().push_back(mb);
buf = &buf[n..];
}
frag.recompute_mlen();
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, thiserror::Error)]
#[non_exhaustive]
pub enum FragmentError {
#[error("memcache fragment: no keys to fragment")]
EmptyKeys,
#[error("memcache fragment: unsupported message type")]
UnsupportedType,
}