pub(crate) const BIG_ARG_PROMOTE_THRESHOLD: usize = 4 * 1024;
pub(crate) const MAX_BULK_LEN: usize = 512 * 1024 * 1024;
const MAX_PROBE_BULKS: usize = 1024;
pub(crate) enum BigArgGenericProbe {
NotApplicable,
Promote { total: usize, bytes_present: usize },
}
pub(crate) fn parse_decimal_at(buf: &[u8], mut i: usize) -> Option<(usize, usize)> {
let start = i;
let mut n: usize = 0;
while i < buf.len() && buf[i].is_ascii_digit() {
if n > usize::MAX / 10 {
return None;
}
n = n * 10 + (buf[i] - b'0') as usize;
i += 1;
}
if i == start {
return None;
}
Some((n, i))
}
fn probe_verb_bulk(buf: &[u8], start: usize) -> Option<([u8; 16], usize, usize)> {
if buf.get(start) != Some(&b'$') {
return None;
}
let (len, after_len_digits) = parse_decimal_at(buf, start + 1)?;
if len == 0 || len > 16 {
return None;
}
if buf.get(after_len_digits..after_len_digits + 2) != Some(b"\r\n") {
return None;
}
let v_start = after_len_digits + 2;
if buf.len() < v_start + len + 2 {
return None;
}
if buf.get(v_start + len..v_start + len + 2) != Some(b"\r\n") {
return None;
}
let mut verb = [0u8; 16];
for (i, b) in buf[v_start..v_start + len].iter().enumerate() {
verb[i] = b.to_ascii_uppercase();
}
Some((verb, len, v_start + len + 2))
}
fn generic_bigbulk_verb_supported(verb: &[u8]) -> bool {
matches!(
verb,
b"SET" | b"APPEND" | b"GETSET" | b"SETEX" | b"PSETEX" | b"MSET"
)
}
enum BulkStep {
Complete { after: usize },
HeaderOnlyBigBody { body_len: usize, after: usize },
Incomplete,
}
fn step_bulk(buf: &[u8], i: usize) -> BulkStep {
if buf.get(i) != Some(&b'$') {
return BulkStep::Incomplete;
}
let Some((body_len, after_len_digits)) = parse_decimal_at(buf, i + 1) else {
return BulkStep::Incomplete;
};
if buf.get(after_len_digits..after_len_digits + 2) != Some(b"\r\n") {
return BulkStep::Incomplete;
}
let body_start = after_len_digits + 2;
let after = body_start + body_len + 2; if buf.len() >= after {
if &buf[body_start + body_len..body_start + body_len + 2] != b"\r\n" {
return BulkStep::Incomplete;
}
BulkStep::Complete { after }
} else {
BulkStep::HeaderOnlyBigBody { body_len, after }
}
}
pub(crate) fn probe_generic_bigbulk(buf: &[u8]) -> BigArgGenericProbe {
if buf.first() != Some(&b'*') {
return BigArgGenericProbe::NotApplicable;
}
let Some((argc, after_argc_digits)) = parse_decimal_at(buf, 1) else {
return BigArgGenericProbe::NotApplicable;
};
if argc < 2 || argc > MAX_PROBE_BULKS {
return BigArgGenericProbe::NotApplicable;
}
if buf.get(after_argc_digits..after_argc_digits + 2) != Some(b"\r\n") {
return BigArgGenericProbe::NotApplicable;
}
let after_argc = after_argc_digits + 2;
let Some((verb, verb_len, after_verb)) = probe_verb_bulk(buf, after_argc) else {
return BigArgGenericProbe::NotApplicable;
};
if !generic_bigbulk_verb_supported(&verb[..verb_len]) {
return BigArgGenericProbe::NotApplicable;
}
if verb[..verb_len] == *b"MSET" && (argc < 3 || argc.is_multiple_of(2)) {
return BigArgGenericProbe::NotApplicable;
}
let mut cursor = after_verb;
for bulk_idx in 1..argc {
match step_bulk(buf, cursor) {
BulkStep::Complete { after } => {
cursor = after;
}
BulkStep::HeaderOnlyBigBody { body_len, after } => {
if bulk_idx != argc - 1 {
return BigArgGenericProbe::NotApplicable;
}
if body_len < BIG_ARG_PROMOTE_THRESHOLD {
return BigArgGenericProbe::NotApplicable;
}
if body_len > MAX_BULK_LEN {
return BigArgGenericProbe::NotApplicable;
}
let total = after;
let bytes_present = buf.len();
return BigArgGenericProbe::Promote { total, bytes_present };
}
BulkStep::Incomplete => {
return BigArgGenericProbe::NotApplicable;
}
}
}
BigArgGenericProbe::NotApplicable
}
#[cfg(test)]
#[path = "uring_bigbulk_probe_tests.rs"]
mod tests;