mod support {
pub mod parser_hardening;
}
use proptest::prelude::*;
use reddb_server::storage::query::parser::{self, ParseError, ParserLimits};
use support::parser_hardening::{
self as harness, assert_no_panic_on, corpus::queue_adversarial_inputs, queue_grammar,
HardenedParser,
};
pub struct QueueParser;
impl HardenedParser for QueueParser {
type Error = ParseError;
fn parse(input: &str) -> Result<(), Self::Error> {
parser::parse(input).map(|_| ())
}
fn parse_with_limits(input: &str, limits: ParserLimits) -> Result<(), Self::Error> {
let mut p = parser::Parser::with_limits(input, limits)?;
p.parse().map(|_| ())
}
}
#[test]
fn queue_parser_does_not_panic_on_adversarial_corpus() {
let handle = std::thread::Builder::new()
.stack_size(8 * 1024 * 1024)
.spawn(|| {
for (name, input) in queue_adversarial_inputs() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
assert_no_panic_on::<QueueParser>(&input);
}));
if result.is_err() {
panic!("queue adversarial corpus entry {} panicked", name);
}
}
})
.expect("spawn corpus thread");
handle.join().expect("corpus thread panic");
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 256,
max_shrink_iters: 64,
..ProptestConfig::default()
})]
#[test]
fn proptest_create_queue_roundtrips(s in queue_grammar::create_queue_stmt()) {
harness::roundtrip_property::<QueueParser>(&s);
prop_assert!(
QueueParser::parse(&s).is_ok(),
"create queue did not parse: {}", s
);
}
#[test]
fn proptest_queue_push_roundtrips(s in queue_grammar::queue_push_stmt()) {
harness::roundtrip_property::<QueueParser>(&s);
prop_assert!(
QueueParser::parse(&s).is_ok(),
"queue push did not parse: {}", s
);
}
#[test]
fn proptest_queue_pop_roundtrips(s in queue_grammar::queue_pop_stmt()) {
harness::roundtrip_property::<QueueParser>(&s);
prop_assert!(
QueueParser::parse(&s).is_ok(),
"queue pop did not parse: {}", s
);
}
#[test]
fn proptest_priority_modifier_roundtrips(s in queue_grammar::priority_modifier_stmt()) {
harness::roundtrip_property::<QueueParser>(&s);
prop_assert!(
QueueParser::parse(&s).is_ok(),
"priority modifier did not parse: {}", s
);
}
#[test]
fn proptest_consumer_group_roundtrips(s in queue_grammar::consumer_group_stmt()) {
harness::roundtrip_property::<QueueParser>(&s);
prop_assert!(
QueueParser::parse(&s).is_ok(),
"consumer group syntax did not parse: {}", s
);
}
#[test]
fn proptest_queue_arbitrary_suffix_no_panic(
prefix in prop_oneof![
Just("CREATE QUEUE ".to_string()),
Just("DROP QUEUE ".to_string()),
Just("QUEUE PUSH ".to_string()),
Just("QUEUE POP ".to_string()),
Just("QUEUE GROUP CREATE ".to_string()),
Just("QUEUE READ ".to_string()),
Just("QUEUE CLAIM ".to_string()),
],
suffix in ".{0,512}",
) {
let s = format!("{}{}", prefix, suffix);
harness::roundtrip_property::<QueueParser>(&s);
}
#[test]
fn proptest_queue_push_input_size_limit_enforced(len in 200usize..2000) {
let limits = ParserLimits {
max_input_bytes: 64,
..ParserLimits::default()
};
let payload = "x".repeat(len);
let input = format!("QUEUE PUSH q '{}'", payload);
let r = QueueParser::parse_with_limits(&input, limits);
prop_assert!(r.is_err(), "oversized push payload must error");
}
}
use reddb_server::storage::query::ast::{QueryExpr, QueueCommand, QueueSide};
use reddb_server::storage::queue::QueueMode;
fn parse_query(input: &str) -> QueryExpr {
parser::parse(input)
.unwrap_or_else(|e| panic!("expected ok for {input:?}, got error: {e}"))
.query
}
#[test]
fn create_queue_with_max_size_and_priority_parses() {
let q = parse_query("CREATE QUEUE tasks MAX_SIZE 1000 PRIORITY");
match q {
QueryExpr::CreateQueue(cq) => {
assert_eq!(cq.name, "tasks");
assert_eq!(cq.mode, QueueMode::Work);
assert_eq!(cq.max_size, Some(1000));
assert!(cq.priority);
assert_eq!(cq.max_attempts, 3);
assert!(cq.dlq.is_none());
}
other => panic!("expected CreateQueue, got {other:?}"),
}
}
#[test]
fn create_and_alter_queue_mode_parse() {
let q = parse_query("CREATE QUEUE tasks FANOUT");
match q {
QueryExpr::CreateQueue(cq) => {
assert_eq!(cq.name, "tasks");
assert_eq!(cq.mode, QueueMode::Fanout);
}
other => panic!("expected CreateQueue, got {other:?}"),
}
let q = parse_query("ALTER QUEUE tasks SET MODE WORK");
match q {
QueryExpr::AlterQueue(aq) => {
assert_eq!(aq.name, "tasks");
assert_eq!(aq.mode, QueueMode::Work);
}
other => panic!("expected AlterQueue, got {other:?}"),
}
}
#[test]
fn create_queue_with_dlq_and_max_attempts_parses() {
let q = parse_query("CREATE QUEUE tasks WITH DLQ failed MAX_ATTEMPTS 5");
match q {
QueryExpr::CreateQueue(cq) => {
assert_eq!(cq.name, "tasks");
assert_eq!(cq.dlq.as_deref(), Some("failed"));
assert_eq!(cq.max_attempts, 5);
}
other => panic!("expected CreateQueue, got {other:?}"),
}
}
#[test]
fn create_queue_if_not_exists_sets_flag() {
let q = parse_query("CREATE QUEUE IF NOT EXISTS tasks");
match q {
QueryExpr::CreateQueue(cq) => {
assert!(cq.if_not_exists, "IF NOT EXISTS flag must be set");
assert_eq!(cq.name, "tasks");
}
other => panic!("expected CreateQueue, got {other:?}"),
}
}
#[test]
fn queue_push_string_payload_parses() {
let q = parse_query("QUEUE PUSH tasks 'hello world'");
match q {
QueryExpr::QueueCommand(QueueCommand::Push {
queue,
side,
priority,
..
}) => {
assert_eq!(queue, "tasks");
assert_eq!(side, QueueSide::Right);
assert_eq!(priority, None);
}
other => panic!("expected QueueCommand::Push, got {other:?}"),
}
}
#[test]
fn queue_push_with_priority_modifier_parses() {
let q = parse_query("QUEUE PUSH tasks 'x' PRIORITY 7");
match q {
QueryExpr::QueueCommand(QueueCommand::Push { priority, .. }) => {
assert_eq!(priority, Some(7));
}
other => panic!("expected QueueCommand::Push, got {other:?}"),
}
}
#[test]
fn queue_pop_with_count_parses() {
let q = parse_query("QUEUE POP tasks COUNT 5");
match q {
QueryExpr::QueueCommand(QueueCommand::Pop { queue, count, side }) => {
assert_eq!(queue, "tasks");
assert_eq!(count, 5);
assert_eq!(side, QueueSide::Left);
}
other => panic!("expected QueueCommand::Pop, got {other:?}"),
}
}
#[test]
fn queue_lpush_rpop_aliases_set_side() {
match parse_query("QUEUE LPUSH tasks 'left'") {
QueryExpr::QueueCommand(QueueCommand::Push { side, .. }) => {
assert_eq!(side, QueueSide::Left);
}
other => panic!("expected Push, got {other:?}"),
}
match parse_query("QUEUE RPOP tasks") {
QueryExpr::QueueCommand(QueueCommand::Pop { side, .. }) => {
assert_eq!(side, QueueSide::Right);
}
other => panic!("expected Pop, got {other:?}"),
}
}
#[test]
fn queue_group_create_parses() {
let q = parse_query("QUEUE GROUP CREATE tasks workers");
match q {
QueryExpr::QueueCommand(QueueCommand::GroupCreate { queue, group }) => {
assert_eq!(queue, "tasks");
assert_eq!(group, "workers");
}
other => panic!("expected GroupCreate, got {other:?}"),
}
}
#[test]
fn queue_claim_full_shape_parses() {
let q = parse_query("QUEUE CLAIM tasks GROUP workers CONSUMER worker2 MIN_IDLE 60000");
match q {
QueryExpr::QueueCommand(QueueCommand::Claim {
queue,
group,
consumer,
min_idle_ms,
}) => {
assert_eq!(queue, "tasks");
assert_eq!(group, "workers");
assert_eq!(consumer, "worker2");
assert_eq!(min_idle_ms, 60_000);
}
other => panic!("expected Claim, got {other:?}"),
}
}
#[test]
fn drop_queue_if_exists_parses() {
let q = parse_query("DROP QUEUE IF EXISTS tasks");
match q {
QueryExpr::DropQueue(dq) => {
assert_eq!(dq.name, "tasks");
assert!(dq.if_exists, "IF EXISTS flag must propagate");
}
other => panic!("expected DropQueue, got {other:?}"),
}
}