1use super::super::ast::{
4 AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueAvailability, QueueCommand,
5 QueueMode, QueueSide, DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP, DEFAULT_QUEUE_LOCK_DEADLINE_MS,
6 DEFAULT_QUEUE_MAX_ATTEMPTS,
7};
8use super::super::lexer::Token;
9use super::error::ParseError;
10use super::Parser;
11
12impl<'a> Parser<'a> {
13 pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
15 let if_not_exists = self.match_if_not_exists()?;
16 let name = self.expect_ident()?;
17
18 let mut mode = QueueMode::Work;
19 let mut priority = false;
20 let mut max_size = None;
21 let mut ttl_ms = None;
22 let mut dlq = None;
23 let mut max_attempts = DEFAULT_QUEUE_MAX_ATTEMPTS;
24 let mut lock_deadline_ms = DEFAULT_QUEUE_LOCK_DEADLINE_MS;
25 let mut in_flight_cap_per_group = DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP;
26 let mut retry_delay_ms: Option<u64> = None;
27
28 loop {
30 if let Some(parsed_mode) = self.consume_queue_mode()? {
31 mode = parsed_mode;
32 } else if self.consume(&Token::Priority)? {
33 priority = true;
34 } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
35 max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
36 } else if self.consume_ident_ci("MAX_ATTEMPTS")?
37 || self.consume_ident_ci("MAXATTEMPTS")?
38 {
39 max_attempts = self.parse_integer()?.max(1) as u32;
40 } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
41 lock_deadline_ms = self.parse_integer()?.max(1) as u64;
42 } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
43 in_flight_cap_per_group = self.parse_integer()?.max(1) as u32;
44 } else if self.consume_ident_ci("RETRY_DELAY")? {
45 let value = self.parse_float()?;
46 let unit = self.parse_queue_duration_unit()?;
47 retry_delay_ms = Some((value * unit).max(0.0) as u64);
48 } else if self.consume(&Token::With)? {
49 if self.consume_ident_ci("EVENTS")? {
50 return Err(ParseError::new(
51 "queues cannot have event subscriptions".to_string(),
52 self.position(),
53 ));
54 } else if self.consume_ident_ci("TTL")? {
55 let value = self.parse_float()?;
56 let unit = self.parse_queue_duration_unit()?;
57 ttl_ms = Some((value * unit) as u64);
58 } else if self.consume_ident_ci("DLQ")? {
59 dlq = Some(self.expect_ident()?);
60 }
61 } else {
62 break;
63 }
64 }
65
66 Ok(QueryExpr::CreateQueue(CreateQueueQuery {
67 name,
68 mode,
69 priority,
70 max_size,
71 ttl_ms,
72 dlq,
73 max_attempts,
74 lock_deadline_ms,
75 in_flight_cap_per_group,
76 if_not_exists,
77 retry_delay_ms,
78 }))
79 }
80
81 pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
90 let name = self.expect_ident()?;
91 if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
92 return Err(ParseError::expected(
93 vec!["SET"],
94 self.peek(),
95 self.position(),
96 ));
97 }
98
99 let mut alter = AlterQueueQuery {
100 name,
101 ..Default::default()
102 };
103
104 if self.consume(&Token::Mode)? || self.consume_ident_ci("MODE")? {
105 alter.mode = Some(self.parse_queue_mode()?);
106 } else if self.consume_ident_ci("MAX_ATTEMPTS")? || self.consume_ident_ci("MAXATTEMPTS")? {
107 alter.max_attempts = Some(self.parse_integer()?.max(1) as u32);
108 } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
109 alter.lock_deadline_ms = Some(self.parse_integer()?.max(1) as u64);
110 } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
111 alter.in_flight_cap_per_group = Some(self.parse_integer()?.max(1) as u32);
112 } else if self.consume_ident_ci("DLQ")? {
113 alter.dlq = Some(self.expect_ident()?);
114 } else if self.consume_ident_ci("RETRY_DELAY")? {
115 let value = self.parse_float()?;
116 let unit = self.parse_queue_duration_unit()?;
117 alter.retry_delay_ms = Some((value * unit).max(0.0) as u64);
118 } else {
119 return Err(ParseError::expected(
120 vec![
121 "MODE",
122 "MAX_ATTEMPTS",
123 "LOCK_DEADLINE_MS",
124 "IN_FLIGHT_CAP_PER_GROUP",
125 "DLQ",
126 "RETRY_DELAY",
127 ],
128 self.peek(),
129 self.position(),
130 ));
131 }
132
133 Ok(QueryExpr::AlterQueue(alter))
134 }
135
136 pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
138 let if_exists = self.match_if_exists()?;
139 let name = self.parse_drop_collection_name()?;
140 Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
141 }
142
143 pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
145 self.expect(Token::Queue)?;
146
147 match self.peek().clone() {
148 Token::Push => {
149 self.advance()?;
150 let queue = self.expect_ident()?;
151 let value = self.parse_value()?;
152 let (priority, available) = self.parse_push_tail_clauses()?;
153 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
154 queue,
155 value,
156 side: QueueSide::Right,
157 priority,
158 available,
159 }))
160 }
161 Token::Pop => {
162 self.advance()?;
163 let queue = self.expect_ident()?;
164 let count = if self.consume(&Token::Count)? {
165 self.parse_integer()? as usize
166 } else {
167 1
168 };
169 self.reject_wait_clause("POP")?;
170 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
171 queue,
172 side: QueueSide::Left,
173 count,
174 }))
175 }
176 Token::Peek => {
177 self.advance()?;
178 let queue = self.expect_ident()?;
179 let count =
184 if self.consume(&Token::Count)? || matches!(self.peek(), Token::Integer(_)) {
185 self.parse_integer()? as usize
186 } else {
187 1
188 };
189 self.reject_wait_clause("PEEK")?;
190 Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
191 }
192 Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
193 self.advance()?;
194 let queue = self.expect_ident()?;
195 Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
196 }
197 Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
198 self.advance()?;
199 self.expect(Token::From)?;
200 let source = self.expect_ident()?;
201 self.expect(Token::To)?;
202 let destination = self.expect_ident()?;
203 let filter = if self.consume(&Token::Where)? {
204 Some(self.parse_filter()?)
205 } else {
206 None
207 };
208 let limit = if self.consume(&Token::Limit)? {
209 self.parse_positive_integer("LIMIT")? as usize
210 } else if filter.is_some() {
211 return Err(ParseError::expected(
212 vec!["LIMIT"],
213 self.peek(),
214 self.position(),
215 ));
216 } else {
217 1
218 };
219 Ok(QueryExpr::QueueCommand(QueueCommand::Move {
220 source,
221 destination,
222 filter,
223 limit,
224 }))
225 }
226 Token::Purge => {
227 self.advance()?;
228 let queue = self.expect_ident()?;
229 Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
230 }
231 Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
232 self.advance()?;
233 let queue = self.expect_ident()?;
234 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
235 queue,
236 side: QueueSide::Left,
237 count: 1,
238 }))
239 }
240 Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
241 self.advance()?;
242 let queue = self.expect_ident()?;
243 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
244 queue,
245 side: QueueSide::Right,
246 count: 1,
247 }))
248 }
249 Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
250 self.advance()?;
251 let queue = self.expect_ident()?;
252 let value = self.parse_value()?;
253 let (_priority, available) = self.parse_push_tail_clauses()?;
254 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
255 queue,
256 value,
257 side: QueueSide::Left,
258 priority: None,
259 available,
260 }))
261 }
262 Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
263 self.advance()?;
264 let queue = self.expect_ident()?;
265 let value = self.parse_value()?;
266 let (priority, available) = self.parse_push_tail_clauses()?;
267 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
268 queue,
269 value,
270 side: QueueSide::Right,
271 priority,
272 available,
273 }))
274 }
275 Token::Group => {
276 self.advance()?;
277 self.expect(Token::Create)?;
278 let queue = self.expect_ident()?;
279 let group = self.expect_ident()?;
280 Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
281 queue,
282 group,
283 }))
284 }
285 Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
286 self.advance()?;
287 let queue = self.expect_ident()?;
288 let group = if self.consume(&Token::Group)? {
289 Some(self.expect_ident()?)
290 } else {
291 None
292 };
293 if !self.consume_ident_ci("CONSUMER")? {
295 return Err(ParseError::expected(
296 vec!["CONSUMER"],
297 self.peek(),
298 self.position(),
299 ));
300 }
301 let consumer = self.expect_ident()?;
302 let count = if self.consume(&Token::Count)? {
303 self.parse_integer()? as usize
304 } else {
305 1
306 };
307 let wait_ms = self.parse_optional_wait_clause()?;
308 Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
309 queue,
310 group,
311 consumer,
312 count,
313 wait_ms,
314 }))
315 }
316 Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
317 self.advance()?;
318 let queue = self.expect_ident()?;
319 self.expect(Token::Group)?;
320 let group = self.expect_ident()?;
321 Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
322 queue,
323 group,
324 }))
325 }
326 Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
327 self.advance()?;
328 let queue = self.expect_ident()?;
329 self.expect(Token::Group)?;
330 let group = self.expect_ident()?;
331 if !self.consume_ident_ci("CONSUMER")? {
332 return Err(ParseError::expected(
333 vec!["CONSUMER"],
334 self.peek(),
335 self.position(),
336 ));
337 }
338 let consumer = self.expect_ident()?;
339 if !self.consume_ident_ci("MIN_IDLE")? {
340 return Err(ParseError::expected(
341 vec!["MIN_IDLE"],
342 self.peek(),
343 self.position(),
344 ));
345 }
346 let min_idle_ms = self.parse_integer()?.max(0) as u64;
347 Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
348 queue,
349 group,
350 consumer,
351 min_idle_ms,
352 }))
353 }
354 Token::Ack => {
355 self.advance()?;
356 let queue = self.expect_ident()?;
357 let (group, message_id, delivery_id, _delay_ms) =
358 self.parse_ack_nack_handle(false)?;
359 Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
360 queue,
361 group,
362 message_id,
363 delivery_id,
364 }))
365 }
366 Token::Nack => {
367 self.advance()?;
368 let queue = self.expect_ident()?;
369 let (group, message_id, delivery_id, delay_ms) =
370 self.parse_ack_nack_handle(true)?;
371 Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
372 queue,
373 group,
374 message_id,
375 delivery_id,
376 delay_ms,
377 }))
378 }
379 _ => Err(ParseError::expected(
380 vec![
381 "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
382 "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
383 ],
384 self.peek(),
385 self.position(),
386 )),
387 }
388 }
389
390 fn parse_ack_nack_handle(
399 &mut self,
400 allow_delay: bool,
401 ) -> Result<(String, String, Option<String>, Option<u64>), ParseError> {
402 let (group, message_id) = if matches!(self.peek(), Token::Group) {
403 self.advance()?;
404 let group = self.expect_ident()?;
405 let message_id = self.parse_string()?;
406 (group, message_id)
407 } else {
408 (String::new(), String::new())
409 };
410 let mut delivery_id: Option<String> = None;
416 let mut delay_ms: Option<u64> = None;
417 while self.consume(&Token::With)? {
418 if self.consume_ident_ci("delivery_id")? {
419 if delivery_id.is_some() {
420 return Err(ParseError::new(
421 "duplicate WITH delivery_id clause".to_string(),
422 self.position(),
423 ));
424 }
425 if !self.consume(&Token::Eq)? {
426 return Err(ParseError::expected(
427 vec!["="],
428 self.peek(),
429 self.position(),
430 ));
431 }
432 delivery_id = Some(self.parse_string()?);
433 } else if allow_delay && self.consume_ident_ci("DELAY")? {
434 if delay_ms.is_some() {
435 return Err(ParseError::new(
436 "duplicate WITH DELAY clause".to_string(),
437 self.position(),
438 ));
439 }
440 let value = self.parse_float()?;
441 let unit = self.parse_queue_duration_unit()?;
442 delay_ms = Some((value * unit).max(0.0) as u64);
443 } else {
444 let mut expected = vec!["delivery_id"];
445 if allow_delay {
446 expected.push("DELAY");
447 }
448 return Err(ParseError::expected(expected, self.peek(), self.position()));
449 }
450 }
451 if group.is_empty() && delivery_id.is_none() {
452 let mut expected = vec!["GROUP", "WITH delivery_id"];
453 if allow_delay {
454 expected.push("WITH DELAY");
455 }
456 return Err(ParseError::expected(expected, self.peek(), self.position()));
457 }
458 Ok((group, message_id, delivery_id, delay_ms))
459 }
460
461 fn parse_push_tail_clauses(
468 &mut self,
469 ) -> Result<(Option<i32>, Option<QueueAvailability>), ParseError> {
470 let mut priority: Option<i32> = None;
471 let mut available: Option<QueueAvailability> = None;
472 loop {
473 if self.consume(&Token::Priority)? {
474 if priority.is_some() {
475 return Err(ParseError::new(
476 "duplicate PRIORITY clause".to_string(),
477 self.position(),
478 ));
479 }
480 priority = Some(self.parse_integer()? as i32);
481 } else if self.peek_ident_ci("DELAY") {
482 self.advance()?;
483 if available.is_some() {
484 return Err(ParseError::new(
485 "QUEUE PUSH accepts at most one of DELAY / AVAILABLE AT".to_string(),
486 self.position(),
487 ));
488 }
489 let value = self.parse_float()?;
490 let unit = self.parse_queue_duration_unit()?;
491 available = Some(QueueAvailability::DelayMs((value * unit).max(0.0) as u64));
492 } else if self.peek_ident_ci("AVAILABLE") {
493 self.advance()?;
494 if !self.consume_ident_ci("AT")? {
495 return Err(ParseError::expected(
496 vec!["AT"],
497 self.peek(),
498 self.position(),
499 ));
500 }
501 if available.is_some() {
502 return Err(ParseError::new(
503 "QUEUE PUSH accepts at most one of DELAY / AVAILABLE AT".to_string(),
504 self.position(),
505 ));
506 }
507 let unix_ms = self.parse_integer()?.max(0) as u64;
508 available = Some(QueueAvailability::AtUnixMs(unix_ms));
509 } else {
510 break;
511 }
512 }
513 Ok((priority, available))
514 }
515
516 fn parse_optional_wait_clause(&mut self) -> Result<Option<u64>, ParseError> {
523 if !self.peek_ident_ci("WAIT") {
524 return Ok(None);
525 }
526 self.advance()?;
527 let value = self.parse_float()?;
528 let unit = self.parse_queue_duration_unit()?;
529 Ok(Some((value * unit).max(0.0) as u64))
530 }
531
532 fn reject_wait_clause(&mut self, command: &'static str) -> Result<(), ParseError> {
536 if self.peek_ident_ci("WAIT") {
537 return Err(ParseError::new(
538 format!("QUEUE {command} does not support WAIT; use QUEUE READ … WAIT <duration>"),
539 self.position(),
540 ));
541 }
542 Ok(())
543 }
544
545 fn peek_ident_ci(&self, name: &str) -> bool {
546 matches!(self.peek(), Token::Ident(id) if id.eq_ignore_ascii_case(name))
547 }
548
549 fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
550 match self.peek() {
551 Token::Work => {
552 self.advance()?;
553 Ok(Some(QueueMode::Work))
554 }
555 Token::Ident(name) => {
556 if let Some(mode) = QueueMode::parse(name) {
557 self.advance()?;
558 Ok(Some(mode))
559 } else {
560 Ok(None)
561 }
562 }
563 _ => Ok(None),
564 }
565 }
566
567 fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
568 match self.consume_queue_mode()? {
569 Some(mode) => Ok(mode),
570 None => Err(ParseError::expected(
571 vec!["FANOUT", "WORK"],
572 self.peek(),
573 self.position(),
574 )),
575 }
576 }
577
578 fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
580 match self.peek().clone() {
581 Token::Ident(ref unit) => {
582 let mult = match unit.to_ascii_lowercase().as_str() {
583 "ms" => 1.0,
584 "s" | "sec" | "secs" => 1_000.0,
585 "m" | "min" | "mins" => 60_000.0,
586 "h" | "hr" | "hrs" => 3_600_000.0,
587 "d" | "day" | "days" => 86_400_000.0,
588 _ => return Ok(1_000.0),
589 };
590 self.advance()?;
591 Ok(mult)
592 }
593 _ => Ok(1_000.0),
594 }
595 }
596}