Skip to main content

dynomite/proto/memcache/
fragment.rs

1//! Memcached request fragmenter.
2//!
3//! Multi-key `get`/`gets` requests must be split into one
4//! sub-request per backend shard. The fragmenter walks the parsed
5//! key list, groups keys by their shard index, and emits one
6//! sub-request per shard with the appropriate verb and trailing
7//! CRLF.
8//!
9//! Sharding is delegated to a [`FragmentDispatcher`] so the
10//! fragmenter does not depend on the cluster layer (which lands
11//! in Stage 10). Tests pass an in-memory dispatcher; the real
12//! engine supplies one backed by `dnode_peer_idx_for_key_on_rack`.
13
14#![allow(clippy::cast_possible_truncation)]
15
16use crate::io::mbuf::{Mbuf, MbufPool};
17use crate::msg::{KeyPos, Msg, MsgType};
18
19use super::commands::memcache_retrieval;
20
21/// Trait that maps a key (or its routing tag) to an integer shard
22/// index. The fragmenter groups keys by shard index.
23pub trait FragmentDispatcher {
24    /// Return the shard index for `key_tag`.
25    fn shard_for(&self, key_tag: &[u8]) -> u32;
26    /// Total number of shards (must be at least
27    /// `max(shard_for) + 1`).
28    fn shard_count(&self) -> u32;
29}
30
31/// Outcome of [`memcache_fragment`].
32#[derive(Debug)]
33pub struct FragmentOutcome {
34    /// Sub-messages to send out, one per participating shard.
35    pub fragments: Vec<Msg>,
36    /// Per-input-key shard index, in input order.
37    pub shard_for_key: Vec<u32>,
38    /// Fragment id assigned to every emitted sub-message.
39    pub frag_id: u64,
40}
41
42static FRAG_ID_SEED: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
43
44fn next_frag_id() -> u64 {
45    FRAG_ID_SEED.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
46}
47
48/// Fragment a multi-key Memcached `get` / `gets` request.
49///
50/// Returns `Ok(None)` for non-retrieval requests (the reference
51/// engine returns `DN_OK` and leaves the message unfragmented).
52/// Returns `Ok(Some(FragmentOutcome))` for retrieval requests with
53/// at least one parsed key. Each emitted fragment carries a fully
54/// formed wire frame (`get k1 k2 ...\r\n` or `gets k1 k2 ...\r\n`)
55/// in its mbuf chain.
56///
57/// # Errors
58///
59/// Returns [`FragmentError::EmptyKeys`] when called on a
60/// retrieval request that has no parsed keys (which should be
61/// impossible if the parser ran successfully).
62/// Returns [`FragmentError::UnsupportedType`] if the message type
63/// is not a memcache retrieval request after the initial check.
64///
65/// # Examples
66///
67/// ```
68/// use dynomite::io::mbuf::MbufPool;
69/// use dynomite::msg::{KeyPos, Msg, MsgType};
70/// use dynomite::proto::memcache::{memcache_fragment, FragmentDispatcher};
71///
72/// struct Dispatcher;
73/// impl FragmentDispatcher for Dispatcher {
74///     fn shard_for(&self, key: &[u8]) -> u32 {
75///         u32::from(key[0]) % 2
76///     }
77///     fn shard_count(&self) -> u32 { 2 }
78/// }
79///
80/// let pool = MbufPool::default();
81/// let mut req = Msg::new(0, MsgType::ReqMcGet, true);
82/// req.push_key(KeyPos::without_tag(b"a".to_vec()));
83/// req.push_key(KeyPos::without_tag(b"b".to_vec()));
84/// let outcome = memcache_fragment(&mut req, &Dispatcher, &pool).unwrap().unwrap();
85/// assert_eq!(outcome.fragments.len(), 2);
86/// ```
87pub fn memcache_fragment<D: FragmentDispatcher + ?Sized>(
88    r: &mut Msg,
89    dispatcher: &D,
90    pool: &MbufPool,
91) -> Result<Option<FragmentOutcome>, FragmentError> {
92    if !memcache_retrieval(r.ty()) {
93        return Ok(None);
94    }
95    if r.keys().is_empty() {
96        return Err(FragmentError::EmptyKeys);
97    }
98
99    let n_shards = dispatcher.shard_count() as usize;
100    let mut bucket: Vec<Option<usize>> = vec![None; n_shards.max(1)];
101    let mut fragments: Vec<Msg> = Vec::new();
102    let mut keys_per_fragment: Vec<Vec<KeyPos>> = Vec::new();
103    let mut shard_for_key = Vec::with_capacity(r.keys().len());
104
105    let frag_id = next_frag_id();
106
107    for key in r.keys() {
108        let idx = dispatcher.shard_for(key.tag_bytes()) as usize;
109        if idx >= bucket.len() {
110            bucket.resize(idx + 1, None);
111        }
112        let bucket_idx = if let Some(j) = bucket.get(idx).copied().flatten() {
113            j
114        } else {
115            let mut sub = Msg::new(0, r.ty(), true);
116            sub.set_frag_id(frag_id);
117            fragments.push(sub);
118            keys_per_fragment.push(Vec::new());
119            let j = fragments.len() - 1;
120            if let Some(slot) = bucket.get_mut(idx) {
121                *slot = Some(j);
122            }
123            j
124        };
125        if let Some(klist) = keys_per_fragment.get_mut(bucket_idx) {
126            klist.push(clone_keypos(key));
127        }
128        shard_for_key.push(u32::try_from(idx).unwrap_or(u32::MAX));
129    }
130
131    // Emit the wire frame for each fragment, then attach key
132    // metadata and recompute the parsed-token count.
133    for (i, frag) in fragments.iter_mut().enumerate() {
134        let keys = keys_per_fragment.get(i).cloned().unwrap_or_default();
135        encode_fragment(frag, &keys, pool)?;
136        for k in &keys {
137            frag.push_key(clone_keypos(k));
138        }
139        let nkeys = u32::try_from(keys.len()).unwrap_or(u32::MAX);
140        // ntokens for memcache get/gets is 1 (verb) + N keys.
141        frag.set_ntokens(1u32.saturating_add(nkeys));
142    }
143
144    r.set_frag_id(frag_id);
145    Ok(Some(FragmentOutcome {
146        fragments,
147        shard_for_key,
148        frag_id,
149    }))
150}
151
152fn clone_keypos(k: &KeyPos) -> KeyPos {
153    KeyPos::new(k.key().to_vec(), k.tag())
154}
155
156fn encode_fragment(frag: &mut Msg, keys: &[KeyPos], pool: &MbufPool) -> Result<(), FragmentError> {
157    let verb: &[u8] = match frag.ty() {
158        MsgType::ReqMcGet => b"get",
159        MsgType::ReqMcGets => b"gets",
160        _ => return Err(FragmentError::UnsupportedType),
161    };
162    let mut buf: Vec<u8> = Vec::new();
163    buf.extend_from_slice(verb);
164    for k in keys {
165        buf.push(b' ');
166        buf.extend_from_slice(k.key());
167    }
168    buf.extend_from_slice(b"\r\n");
169    write_buf_into_chain(frag, pool, &buf);
170    Ok(())
171}
172
173fn write_buf_into_chain(frag: &mut Msg, pool: &MbufPool, mut buf: &[u8]) {
174    while !buf.is_empty() {
175        let mut mb: Mbuf = pool.get();
176        let n = mb.recv(buf);
177        if n == 0 {
178            break;
179        }
180        frag.mbufs_mut().push_back(mb);
181        buf = &buf[n..];
182    }
183    frag.recompute_mlen();
184}
185
186/// Errors produced by [`memcache_fragment`].
187#[derive(Copy, Clone, Debug, Eq, PartialEq, thiserror::Error)]
188#[non_exhaustive]
189pub enum FragmentError {
190    /// Caller invoked the fragmenter on a retrieval request with no
191    /// parsed keys.
192    #[error("memcache fragment: no keys to fragment")]
193    EmptyKeys,
194    /// Caller invoked the fragmenter on an unsupported message type.
195    #[error("memcache fragment: unsupported message type")]
196    UnsupportedType,
197}