dynomite/proto/memcache/
fragment.rs1#![allow(clippy::cast_possible_truncation)]
15
16use crate::io::mbuf::{Mbuf, MbufPool};
17use crate::msg::{KeyPos, Msg, MsgType};
18
19use super::commands::memcache_retrieval;
20
21pub trait FragmentDispatcher {
24 fn shard_for(&self, key_tag: &[u8]) -> u32;
26 fn shard_count(&self) -> u32;
29}
30
31#[derive(Debug)]
33pub struct FragmentOutcome {
34 pub fragments: Vec<Msg>,
36 pub shard_for_key: Vec<u32>,
38 pub frag_id: u64,
40}
41
42static FRAG_ID_SEED: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
43
44fn next_frag_id() -> u64 {
45 FRAG_ID_SEED.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
46}
47
48pub fn memcache_fragment<D: FragmentDispatcher + ?Sized>(
88 r: &mut Msg,
89 dispatcher: &D,
90 pool: &MbufPool,
91) -> Result<Option<FragmentOutcome>, FragmentError> {
92 if !memcache_retrieval(r.ty()) {
93 return Ok(None);
94 }
95 if r.keys().is_empty() {
96 return Err(FragmentError::EmptyKeys);
97 }
98
99 let n_shards = dispatcher.shard_count() as usize;
100 let mut bucket: Vec<Option<usize>> = vec![None; n_shards.max(1)];
101 let mut fragments: Vec<Msg> = Vec::new();
102 let mut keys_per_fragment: Vec<Vec<KeyPos>> = Vec::new();
103 let mut shard_for_key = Vec::with_capacity(r.keys().len());
104
105 let frag_id = next_frag_id();
106
107 for key in r.keys() {
108 let idx = dispatcher.shard_for(key.tag_bytes()) as usize;
109 if idx >= bucket.len() {
110 bucket.resize(idx + 1, None);
111 }
112 let bucket_idx = if let Some(j) = bucket.get(idx).copied().flatten() {
113 j
114 } else {
115 let mut sub = Msg::new(0, r.ty(), true);
116 sub.set_frag_id(frag_id);
117 fragments.push(sub);
118 keys_per_fragment.push(Vec::new());
119 let j = fragments.len() - 1;
120 if let Some(slot) = bucket.get_mut(idx) {
121 *slot = Some(j);
122 }
123 j
124 };
125 if let Some(klist) = keys_per_fragment.get_mut(bucket_idx) {
126 klist.push(clone_keypos(key));
127 }
128 shard_for_key.push(u32::try_from(idx).unwrap_or(u32::MAX));
129 }
130
131 for (i, frag) in fragments.iter_mut().enumerate() {
134 let keys = keys_per_fragment.get(i).cloned().unwrap_or_default();
135 encode_fragment(frag, &keys, pool)?;
136 for k in &keys {
137 frag.push_key(clone_keypos(k));
138 }
139 let nkeys = u32::try_from(keys.len()).unwrap_or(u32::MAX);
140 frag.set_ntokens(1u32.saturating_add(nkeys));
142 }
143
144 r.set_frag_id(frag_id);
145 Ok(Some(FragmentOutcome {
146 fragments,
147 shard_for_key,
148 frag_id,
149 }))
150}
151
152fn clone_keypos(k: &KeyPos) -> KeyPos {
153 KeyPos::new(k.key().to_vec(), k.tag())
154}
155
156fn encode_fragment(frag: &mut Msg, keys: &[KeyPos], pool: &MbufPool) -> Result<(), FragmentError> {
157 let verb: &[u8] = match frag.ty() {
158 MsgType::ReqMcGet => b"get",
159 MsgType::ReqMcGets => b"gets",
160 _ => return Err(FragmentError::UnsupportedType),
161 };
162 let mut buf: Vec<u8> = Vec::new();
163 buf.extend_from_slice(verb);
164 for k in keys {
165 buf.push(b' ');
166 buf.extend_from_slice(k.key());
167 }
168 buf.extend_from_slice(b"\r\n");
169 write_buf_into_chain(frag, pool, &buf);
170 Ok(())
171}
172
173fn write_buf_into_chain(frag: &mut Msg, pool: &MbufPool, mut buf: &[u8]) {
174 while !buf.is_empty() {
175 let mut mb: Mbuf = pool.get();
176 let n = mb.recv(buf);
177 if n == 0 {
178 break;
179 }
180 frag.mbufs_mut().push_back(mb);
181 buf = &buf[n..];
182 }
183 frag.recompute_mlen();
184}
185
186#[derive(Copy, Clone, Debug, Eq, PartialEq, thiserror::Error)]
188#[non_exhaustive]
189pub enum FragmentError {
190 #[error("memcache fragment: no keys to fragment")]
193 EmptyKeys,
194 #[error("memcache fragment: unsupported message type")]
196 UnsupportedType,
197}