use super::super::ast::{
AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueAvailability, QueueCommand,
QueueMode, QueueSide, DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP, DEFAULT_QUEUE_LOCK_DEADLINE_MS,
DEFAULT_QUEUE_MAX_ATTEMPTS,
};
use super::super::lexer::Token;
use super::error::ParseError;
use super::Parser;
impl<'a> Parser<'a> {
pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
let if_not_exists = self.match_if_not_exists()?;
let name = self.expect_ident()?;
let mut mode = QueueMode::Work;
let mut priority = false;
let mut max_size = None;
let mut ttl_ms = None;
let mut dlq = None;
let mut max_attempts = DEFAULT_QUEUE_MAX_ATTEMPTS;
let mut lock_deadline_ms = DEFAULT_QUEUE_LOCK_DEADLINE_MS;
let mut in_flight_cap_per_group = DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP;
let mut retry_delay_ms: Option<u64> = None;
loop {
if let Some(parsed_mode) = self.consume_queue_mode()? {
mode = parsed_mode;
} else if self.consume(&Token::Priority)? {
priority = true;
} else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
} else if self.consume_ident_ci("MAX_ATTEMPTS")?
|| self.consume_ident_ci("MAXATTEMPTS")?
{
max_attempts = self.parse_integer()?.max(1) as u32;
} else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
lock_deadline_ms = self.parse_integer()?.max(1) as u64;
} else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
in_flight_cap_per_group = self.parse_integer()?.max(1) as u32;
} else if self.consume_ident_ci("RETRY_DELAY")? {
let value = self.parse_float()?;
let unit = self.parse_queue_duration_unit()?;
retry_delay_ms = Some((value * unit).max(0.0) as u64);
} else if self.consume(&Token::With)? {
if self.consume_ident_ci("EVENTS")? {
return Err(ParseError::new(
"queues cannot have event subscriptions".to_string(),
self.position(),
));
} else if self.consume_ident_ci("TTL")? {
let value = self.parse_float()?;
let unit = self.parse_queue_duration_unit()?;
ttl_ms = Some((value * unit) as u64);
} else if self.consume_ident_ci("DLQ")? {
dlq = Some(self.expect_ident()?);
}
} else {
break;
}
}
Ok(QueryExpr::CreateQueue(CreateQueueQuery {
name,
mode,
priority,
max_size,
ttl_ms,
dlq,
max_attempts,
lock_deadline_ms,
in_flight_cap_per_group,
if_not_exists,
retry_delay_ms,
}))
}
pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
let name = self.expect_ident()?;
if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
return Err(ParseError::expected(
vec!["SET"],
self.peek(),
self.position(),
));
}
let mut alter = AlterQueueQuery {
name,
..Default::default()
};
if self.consume(&Token::Mode)? || self.consume_ident_ci("MODE")? {
alter.mode = Some(self.parse_queue_mode()?);
} else if self.consume_ident_ci("MAX_ATTEMPTS")? || self.consume_ident_ci("MAXATTEMPTS")? {
alter.max_attempts = Some(self.parse_integer()?.max(1) as u32);
} else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
alter.lock_deadline_ms = Some(self.parse_integer()?.max(1) as u64);
} else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
alter.in_flight_cap_per_group = Some(self.parse_integer()?.max(1) as u32);
} else if self.consume_ident_ci("DLQ")? {
alter.dlq = Some(self.expect_ident()?);
} else if self.consume_ident_ci("RETRY_DELAY")? {
let value = self.parse_float()?;
let unit = self.parse_queue_duration_unit()?;
alter.retry_delay_ms = Some((value * unit).max(0.0) as u64);
} else {
return Err(ParseError::expected(
vec![
"MODE",
"MAX_ATTEMPTS",
"LOCK_DEADLINE_MS",
"IN_FLIGHT_CAP_PER_GROUP",
"DLQ",
"RETRY_DELAY",
],
self.peek(),
self.position(),
));
}
Ok(QueryExpr::AlterQueue(alter))
}
pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
let if_exists = self.match_if_exists()?;
let name = self.parse_drop_collection_name()?;
Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
}
pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
self.expect(Token::Queue)?;
match self.peek().clone() {
Token::Push => {
self.advance()?;
let queue = self.expect_ident()?;
let value = self.parse_value()?;
let (priority, available) = self.parse_push_tail_clauses()?;
Ok(QueryExpr::QueueCommand(QueueCommand::Push {
queue,
value,
side: QueueSide::Right,
priority,
available,
}))
}
Token::Pop => {
self.advance()?;
let queue = self.expect_ident()?;
let count = if self.consume(&Token::Count)? {
self.parse_integer()? as usize
} else {
1
};
self.reject_wait_clause("POP")?;
Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
queue,
side: QueueSide::Left,
count,
}))
}
Token::Peek => {
self.advance()?;
let queue = self.expect_ident()?;
let count =
if self.consume(&Token::Count)? || matches!(self.peek(), Token::Integer(_)) {
self.parse_integer()? as usize
} else {
1
};
self.reject_wait_clause("PEEK")?;
Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
}
Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
self.advance()?;
let queue = self.expect_ident()?;
Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
}
Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
self.advance()?;
self.expect(Token::From)?;
let source = self.expect_ident()?;
self.expect(Token::To)?;
let destination = self.expect_ident()?;
let filter = if self.consume(&Token::Where)? {
Some(self.parse_filter()?)
} else {
None
};
let limit = if self.consume(&Token::Limit)? {
self.parse_positive_integer("LIMIT")? as usize
} else if filter.is_some() {
return Err(ParseError::expected(
vec!["LIMIT"],
self.peek(),
self.position(),
));
} else {
1
};
Ok(QueryExpr::QueueCommand(QueueCommand::Move {
source,
destination,
filter,
limit,
}))
}
Token::Purge => {
self.advance()?;
let queue = self.expect_ident()?;
Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
}
Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
self.advance()?;
let queue = self.expect_ident()?;
Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
queue,
side: QueueSide::Left,
count: 1,
}))
}
Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
self.advance()?;
let queue = self.expect_ident()?;
Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
queue,
side: QueueSide::Right,
count: 1,
}))
}
Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
self.advance()?;
let queue = self.expect_ident()?;
let value = self.parse_value()?;
let (_priority, available) = self.parse_push_tail_clauses()?;
Ok(QueryExpr::QueueCommand(QueueCommand::Push {
queue,
value,
side: QueueSide::Left,
priority: None,
available,
}))
}
Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
self.advance()?;
let queue = self.expect_ident()?;
let value = self.parse_value()?;
let (priority, available) = self.parse_push_tail_clauses()?;
Ok(QueryExpr::QueueCommand(QueueCommand::Push {
queue,
value,
side: QueueSide::Right,
priority,
available,
}))
}
Token::Group => {
self.advance()?;
self.expect(Token::Create)?;
let queue = self.expect_ident()?;
let group = self.expect_ident()?;
Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
queue,
group,
}))
}
Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
self.advance()?;
let queue = self.expect_ident()?;
let group = if self.consume(&Token::Group)? {
Some(self.expect_ident()?)
} else {
None
};
if !self.consume_ident_ci("CONSUMER")? {
return Err(ParseError::expected(
vec!["CONSUMER"],
self.peek(),
self.position(),
));
}
let consumer = self.expect_ident()?;
let count = if self.consume(&Token::Count)? {
self.parse_integer()? as usize
} else {
1
};
let wait_ms = self.parse_optional_wait_clause()?;
Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
queue,
group,
consumer,
count,
wait_ms,
}))
}
Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
self.advance()?;
let queue = self.expect_ident()?;
self.expect(Token::Group)?;
let group = self.expect_ident()?;
Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
queue,
group,
}))
}
Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
self.advance()?;
let queue = self.expect_ident()?;
self.expect(Token::Group)?;
let group = self.expect_ident()?;
if !self.consume_ident_ci("CONSUMER")? {
return Err(ParseError::expected(
vec!["CONSUMER"],
self.peek(),
self.position(),
));
}
let consumer = self.expect_ident()?;
if !self.consume_ident_ci("MIN_IDLE")? {
return Err(ParseError::expected(
vec!["MIN_IDLE"],
self.peek(),
self.position(),
));
}
let min_idle_ms = self.parse_integer()?.max(0) as u64;
Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
queue,
group,
consumer,
min_idle_ms,
}))
}
Token::Ack => {
self.advance()?;
let queue = self.expect_ident()?;
let (group, message_id, delivery_id, _delay_ms) =
self.parse_ack_nack_handle(false)?;
Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
queue,
group,
message_id,
delivery_id,
}))
}
Token::Nack => {
self.advance()?;
let queue = self.expect_ident()?;
let (group, message_id, delivery_id, delay_ms) =
self.parse_ack_nack_handle(true)?;
Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
queue,
group,
message_id,
delivery_id,
delay_ms,
}))
}
_ => Err(ParseError::expected(
vec![
"PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
"RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
],
self.peek(),
self.position(),
)),
}
}
fn parse_ack_nack_handle(
&mut self,
allow_delay: bool,
) -> Result<(String, String, Option<String>, Option<u64>), ParseError> {
let (group, message_id) = if matches!(self.peek(), Token::Group) {
self.advance()?;
let group = self.expect_ident()?;
let message_id = self.parse_string()?;
(group, message_id)
} else {
(String::new(), String::new())
};
let mut delivery_id: Option<String> = None;
let mut delay_ms: Option<u64> = None;
while self.consume(&Token::With)? {
if self.consume_ident_ci("delivery_id")? {
if delivery_id.is_some() {
return Err(ParseError::new(
"duplicate WITH delivery_id clause".to_string(),
self.position(),
));
}
if !self.consume(&Token::Eq)? {
return Err(ParseError::expected(
vec!["="],
self.peek(),
self.position(),
));
}
delivery_id = Some(self.parse_string()?);
} else if allow_delay && self.consume_ident_ci("DELAY")? {
if delay_ms.is_some() {
return Err(ParseError::new(
"duplicate WITH DELAY clause".to_string(),
self.position(),
));
}
let value = self.parse_float()?;
let unit = self.parse_queue_duration_unit()?;
delay_ms = Some((value * unit).max(0.0) as u64);
} else {
let mut expected = vec!["delivery_id"];
if allow_delay {
expected.push("DELAY");
}
return Err(ParseError::expected(expected, self.peek(), self.position()));
}
}
if group.is_empty() && delivery_id.is_none() {
let mut expected = vec!["GROUP", "WITH delivery_id"];
if allow_delay {
expected.push("WITH DELAY");
}
return Err(ParseError::expected(expected, self.peek(), self.position()));
}
Ok((group, message_id, delivery_id, delay_ms))
}
fn parse_push_tail_clauses(
&mut self,
) -> Result<(Option<i32>, Option<QueueAvailability>), ParseError> {
let mut priority: Option<i32> = None;
let mut available: Option<QueueAvailability> = None;
loop {
if self.consume(&Token::Priority)? {
if priority.is_some() {
return Err(ParseError::new(
"duplicate PRIORITY clause".to_string(),
self.position(),
));
}
priority = Some(self.parse_integer()? as i32);
} else if self.peek_ident_ci("DELAY") {
self.advance()?;
if available.is_some() {
return Err(ParseError::new(
"QUEUE PUSH accepts at most one of DELAY / AVAILABLE AT".to_string(),
self.position(),
));
}
let value = self.parse_float()?;
let unit = self.parse_queue_duration_unit()?;
available = Some(QueueAvailability::DelayMs((value * unit).max(0.0) as u64));
} else if self.peek_ident_ci("AVAILABLE") {
self.advance()?;
if !self.consume_ident_ci("AT")? {
return Err(ParseError::expected(
vec!["AT"],
self.peek(),
self.position(),
));
}
if available.is_some() {
return Err(ParseError::new(
"QUEUE PUSH accepts at most one of DELAY / AVAILABLE AT".to_string(),
self.position(),
));
}
let unix_ms = self.parse_integer()?.max(0) as u64;
available = Some(QueueAvailability::AtUnixMs(unix_ms));
} else {
break;
}
}
Ok((priority, available))
}
fn parse_optional_wait_clause(&mut self) -> Result<Option<u64>, ParseError> {
if !self.peek_ident_ci("WAIT") {
return Ok(None);
}
self.advance()?;
let value = self.parse_float()?;
let unit = self.parse_queue_duration_unit()?;
Ok(Some((value * unit).max(0.0) as u64))
}
fn reject_wait_clause(&mut self, command: &'static str) -> Result<(), ParseError> {
if self.peek_ident_ci("WAIT") {
return Err(ParseError::new(
format!("QUEUE {command} does not support WAIT; use QUEUE READ … WAIT <duration>"),
self.position(),
));
}
Ok(())
}
fn peek_ident_ci(&self, name: &str) -> bool {
matches!(self.peek(), Token::Ident(id) if id.eq_ignore_ascii_case(name))
}
fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
match self.peek() {
Token::Work => {
self.advance()?;
Ok(Some(QueueMode::Work))
}
Token::Ident(name) => {
if let Some(mode) = QueueMode::parse(name) {
self.advance()?;
Ok(Some(mode))
} else {
Ok(None)
}
}
_ => Ok(None),
}
}
fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
match self.consume_queue_mode()? {
Some(mode) => Ok(mode),
None => Err(ParseError::expected(
vec!["FANOUT", "WORK"],
self.peek(),
self.position(),
)),
}
}
fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
match self.peek().clone() {
Token::Ident(ref unit) => {
let mult = match unit.to_ascii_lowercase().as_str() {
"ms" => 1.0,
"s" | "sec" | "secs" => 1_000.0,
"m" | "min" | "mins" => 60_000.0,
"h" | "hr" | "hrs" => 3_600_000.0,
"d" | "day" | "days" => 86_400_000.0,
_ => return Ok(1_000.0),
};
self.advance()?;
Ok(mult)
}
_ => Ok(1_000.0),
}
}
}