Skip to main content

reddb_server/storage/query/parser/
queue.rs

1//! Parser for QUEUE commands and CREATE/DROP QUEUE
2
3use super::super::ast::{
4    AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueAvailability, QueueCommand,
5    QueueMode, QueueSide, DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP, DEFAULT_QUEUE_LOCK_DEADLINE_MS,
6    DEFAULT_QUEUE_MAX_ATTEMPTS,
7};
8use super::super::lexer::Token;
9use super::error::ParseError;
10use super::Parser;
11
12impl<'a> Parser<'a> {
13    /// Parse CREATE QUEUE body (after CREATE QUEUE consumed)
14    pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
15        let if_not_exists = self.match_if_not_exists()?;
16        let name = self.expect_ident()?;
17
18        let mut mode = QueueMode::Work;
19        let mut priority = false;
20        let mut max_size = None;
21        let mut ttl_ms = None;
22        let mut dlq = None;
23        let mut max_attempts = DEFAULT_QUEUE_MAX_ATTEMPTS;
24        let mut lock_deadline_ms = DEFAULT_QUEUE_LOCK_DEADLINE_MS;
25        let mut in_flight_cap_per_group = DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP;
26        let mut retry_delay_ms: Option<u64> = None;
27
28        // Parse optional clauses in any order
29        loop {
30            if let Some(parsed_mode) = self.consume_queue_mode()? {
31                mode = parsed_mode;
32            } else if self.consume(&Token::Priority)? {
33                priority = true;
34            } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
35                max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
36            } else if self.consume_ident_ci("MAX_ATTEMPTS")?
37                || self.consume_ident_ci("MAXATTEMPTS")?
38            {
39                max_attempts = self.parse_integer()?.max(1) as u32;
40            } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
41                lock_deadline_ms = self.parse_integer()?.max(1) as u64;
42            } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
43                in_flight_cap_per_group = self.parse_integer()?.max(1) as u32;
44            } else if self.consume_ident_ci("RETRY_DELAY")? {
45                let value = self.parse_float()?;
46                let unit = self.parse_queue_duration_unit()?;
47                retry_delay_ms = Some((value * unit).max(0.0) as u64);
48            } else if self.consume(&Token::With)? {
49                if self.consume_ident_ci("EVENTS")? {
50                    return Err(ParseError::new(
51                        "queues cannot have event subscriptions".to_string(),
52                        self.position(),
53                    ));
54                } else if self.consume_ident_ci("TTL")? {
55                    let value = self.parse_float()?;
56                    let unit = self.parse_queue_duration_unit()?;
57                    ttl_ms = Some((value * unit) as u64);
58                } else if self.consume_ident_ci("DLQ")? {
59                    dlq = Some(self.expect_ident()?);
60                }
61            } else {
62                break;
63            }
64        }
65
66        Ok(QueryExpr::CreateQueue(CreateQueueQuery {
67            name,
68            mode,
69            priority,
70            max_size,
71            ttl_ms,
72            dlq,
73            max_attempts,
74            lock_deadline_ms,
75            in_flight_cap_per_group,
76            if_not_exists,
77            retry_delay_ms,
78        }))
79    }
80
81    /// Parse ALTER QUEUE body (after ALTER QUEUE consumed)
82    ///
83    /// Syntax: `ALTER QUEUE <name> SET <clause>` where `<clause>` is one of
84    ///   `MODE <WORK|FANOUT>`
85    ///   `MAX_ATTEMPTS <int>`
86    ///   `LOCK_DEADLINE_MS <int>`
87    ///   `IN_FLIGHT_CAP_PER_GROUP <int>`
88    ///   `DLQ <ident>`
89    pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
90        let name = self.expect_ident()?;
91        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
92            return Err(ParseError::expected(
93                vec!["SET"],
94                self.peek(),
95                self.position(),
96            ));
97        }
98
99        let mut alter = AlterQueueQuery {
100            name,
101            ..Default::default()
102        };
103
104        if self.consume(&Token::Mode)? || self.consume_ident_ci("MODE")? {
105            alter.mode = Some(self.parse_queue_mode()?);
106        } else if self.consume_ident_ci("MAX_ATTEMPTS")? || self.consume_ident_ci("MAXATTEMPTS")? {
107            alter.max_attempts = Some(self.parse_integer()?.max(1) as u32);
108        } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
109            alter.lock_deadline_ms = Some(self.parse_integer()?.max(1) as u64);
110        } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
111            alter.in_flight_cap_per_group = Some(self.parse_integer()?.max(1) as u32);
112        } else if self.consume_ident_ci("DLQ")? {
113            alter.dlq = Some(self.expect_ident()?);
114        } else if self.consume_ident_ci("RETRY_DELAY")? {
115            let value = self.parse_float()?;
116            let unit = self.parse_queue_duration_unit()?;
117            alter.retry_delay_ms = Some((value * unit).max(0.0) as u64);
118        } else {
119            return Err(ParseError::expected(
120                vec![
121                    "MODE",
122                    "MAX_ATTEMPTS",
123                    "LOCK_DEADLINE_MS",
124                    "IN_FLIGHT_CAP_PER_GROUP",
125                    "DLQ",
126                    "RETRY_DELAY",
127                ],
128                self.peek(),
129                self.position(),
130            ));
131        }
132
133        Ok(QueryExpr::AlterQueue(alter))
134    }
135
136    /// Parse DROP QUEUE body (after DROP QUEUE consumed)
137    pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
138        let if_exists = self.match_if_exists()?;
139        let name = self.parse_drop_collection_name()?;
140        Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
141    }
142
143    /// Parse QUEUE subcommand (after QUEUE token consumed)
144    pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
145        self.expect(Token::Queue)?;
146
147        match self.peek().clone() {
148            Token::Push => {
149                self.advance()?;
150                let queue = self.expect_ident()?;
151                let value = self.parse_value()?;
152                let (priority, available) = self.parse_push_tail_clauses()?;
153                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
154                    queue,
155                    value,
156                    side: QueueSide::Right,
157                    priority,
158                    available,
159                }))
160            }
161            Token::Pop => {
162                self.advance()?;
163                let queue = self.expect_ident()?;
164                let count = if self.consume(&Token::Count)? {
165                    self.parse_integer()? as usize
166                } else {
167                    1
168                };
169                self.reject_wait_clause("POP")?;
170                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
171                    queue,
172                    side: QueueSide::Left,
173                    count,
174                }))
175            }
176            Token::Peek => {
177                self.advance()?;
178                let queue = self.expect_ident()?;
179                // Accept either `PEEK <q> COUNT <n>` or a bare `PEEK <q> <n>`.
180                // `consume` has a side effect (advances past COUNT), so the
181                // short-circuit order matters: only peek for a bare integer
182                // when the COUNT keyword was absent.
183                let count =
184                    if self.consume(&Token::Count)? || matches!(self.peek(), Token::Integer(_)) {
185                        self.parse_integer()? as usize
186                    } else {
187                        1
188                    };
189                self.reject_wait_clause("PEEK")?;
190                Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
191            }
192            Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
193                self.advance()?;
194                let queue = self.expect_ident()?;
195                Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
196            }
197            Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
198                self.advance()?;
199                self.expect(Token::From)?;
200                let source = self.expect_ident()?;
201                self.expect(Token::To)?;
202                let destination = self.expect_ident()?;
203                let filter = if self.consume(&Token::Where)? {
204                    Some(self.parse_filter()?)
205                } else {
206                    None
207                };
208                let limit = if self.consume(&Token::Limit)? {
209                    self.parse_positive_integer("LIMIT")? as usize
210                } else if filter.is_some() {
211                    return Err(ParseError::expected(
212                        vec!["LIMIT"],
213                        self.peek(),
214                        self.position(),
215                    ));
216                } else {
217                    1
218                };
219                Ok(QueryExpr::QueueCommand(QueueCommand::Move {
220                    source,
221                    destination,
222                    filter,
223                    limit,
224                }))
225            }
226            Token::Purge => {
227                self.advance()?;
228                let queue = self.expect_ident()?;
229                Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
230            }
231            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
232                self.advance()?;
233                let queue = self.expect_ident()?;
234                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
235                    queue,
236                    side: QueueSide::Left,
237                    count: 1,
238                }))
239            }
240            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
241                self.advance()?;
242                let queue = self.expect_ident()?;
243                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
244                    queue,
245                    side: QueueSide::Right,
246                    count: 1,
247                }))
248            }
249            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
250                self.advance()?;
251                let queue = self.expect_ident()?;
252                let value = self.parse_value()?;
253                let (_priority, available) = self.parse_push_tail_clauses()?;
254                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
255                    queue,
256                    value,
257                    side: QueueSide::Left,
258                    priority: None,
259                    available,
260                }))
261            }
262            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
263                self.advance()?;
264                let queue = self.expect_ident()?;
265                let value = self.parse_value()?;
266                let (priority, available) = self.parse_push_tail_clauses()?;
267                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
268                    queue,
269                    value,
270                    side: QueueSide::Right,
271                    priority,
272                    available,
273                }))
274            }
275            Token::Group => {
276                self.advance()?;
277                self.expect(Token::Create)?;
278                let queue = self.expect_ident()?;
279                let group = self.expect_ident()?;
280                Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
281                    queue,
282                    group,
283                }))
284            }
285            Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
286                self.advance()?;
287                let queue = self.expect_ident()?;
288                let group = if self.consume(&Token::Group)? {
289                    Some(self.expect_ident()?)
290                } else {
291                    None
292                };
293                // CONSUMER consumer_name
294                if !self.consume_ident_ci("CONSUMER")? {
295                    return Err(ParseError::expected(
296                        vec!["CONSUMER"],
297                        self.peek(),
298                        self.position(),
299                    ));
300                }
301                let consumer = self.expect_ident()?;
302                let count = if self.consume(&Token::Count)? {
303                    self.parse_integer()? as usize
304                } else {
305                    1
306                };
307                let wait_ms = self.parse_optional_wait_clause()?;
308                Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
309                    queue,
310                    group,
311                    consumer,
312                    count,
313                    wait_ms,
314                }))
315            }
316            Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
317                self.advance()?;
318                let queue = self.expect_ident()?;
319                self.expect(Token::Group)?;
320                let group = self.expect_ident()?;
321                Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
322                    queue,
323                    group,
324                }))
325            }
326            Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
327                self.advance()?;
328                let queue = self.expect_ident()?;
329                self.expect(Token::Group)?;
330                let group = self.expect_ident()?;
331                if !self.consume_ident_ci("CONSUMER")? {
332                    return Err(ParseError::expected(
333                        vec!["CONSUMER"],
334                        self.peek(),
335                        self.position(),
336                    ));
337                }
338                let consumer = self.expect_ident()?;
339                if !self.consume_ident_ci("MIN_IDLE")? {
340                    return Err(ParseError::expected(
341                        vec!["MIN_IDLE"],
342                        self.peek(),
343                        self.position(),
344                    ));
345                }
346                let min_idle_ms = self.parse_integer()?.max(0) as u64;
347                Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
348                    queue,
349                    group,
350                    consumer,
351                    min_idle_ms,
352                }))
353            }
354            Token::Ack => {
355                self.advance()?;
356                let queue = self.expect_ident()?;
357                let (group, message_id, delivery_id, _delay_ms) =
358                    self.parse_ack_nack_handle(false)?;
359                Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
360                    queue,
361                    group,
362                    message_id,
363                    delivery_id,
364                }))
365            }
366            Token::Nack => {
367                self.advance()?;
368                let queue = self.expect_ident()?;
369                let (group, message_id, delivery_id, delay_ms) =
370                    self.parse_ack_nack_handle(true)?;
371                Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
372                    queue,
373                    group,
374                    message_id,
375                    delivery_id,
376                    delay_ms,
377                }))
378            }
379            _ => Err(ParseError::expected(
380                vec![
381                    "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
382                    "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
383                ],
384                self.peek(),
385                self.position(),
386            )),
387        }
388    }
389
390    /// Parse the tuple/`delivery_id` handle that follows `ACK <queue>` /
391    /// `NACK <queue>`. Returns `(group, message_id, delivery_id)`:
392    ///
393    /// - `GROUP <group> '<message_id>'`               → legacy tuple, no delivery_id
394    /// - `GROUP <group> '<message_id>' WITH delivery_id = '<base32>'` → both, delivery_id wins (ADR 0026)
395    /// - `WITH delivery_id = '<base32>'`              → delivery_id only; group / message_id empty
396    ///
397    /// Refusing both is a parse error — at least one handle is required.
398    fn parse_ack_nack_handle(
399        &mut self,
400        allow_delay: bool,
401    ) -> Result<(String, String, Option<String>, Option<u64>), ParseError> {
402        let (group, message_id) = if matches!(self.peek(), Token::Group) {
403            self.advance()?;
404            let group = self.expect_ident()?;
405            let message_id = self.parse_string()?;
406            (group, message_id)
407        } else {
408            (String::new(), String::new())
409        };
410        // After the optional `GROUP <g> '<mid>'`, we can have any
411        // combination of `WITH delivery_id = '<id>'` and (for NACK only)
412        // `WITH DELAY <duration>` — in any order. Each clause may appear
413        // at most once. A bare `WITH` followed by neither keyword is a
414        // parse error.
415        let mut delivery_id: Option<String> = None;
416        let mut delay_ms: Option<u64> = None;
417        while self.consume(&Token::With)? {
418            if self.consume_ident_ci("delivery_id")? {
419                if delivery_id.is_some() {
420                    return Err(ParseError::new(
421                        "duplicate WITH delivery_id clause".to_string(),
422                        self.position(),
423                    ));
424                }
425                if !self.consume(&Token::Eq)? {
426                    return Err(ParseError::expected(
427                        vec!["="],
428                        self.peek(),
429                        self.position(),
430                    ));
431                }
432                delivery_id = Some(self.parse_string()?);
433            } else if allow_delay && self.consume_ident_ci("DELAY")? {
434                if delay_ms.is_some() {
435                    return Err(ParseError::new(
436                        "duplicate WITH DELAY clause".to_string(),
437                        self.position(),
438                    ));
439                }
440                let value = self.parse_float()?;
441                let unit = self.parse_queue_duration_unit()?;
442                delay_ms = Some((value * unit).max(0.0) as u64);
443            } else {
444                let mut expected = vec!["delivery_id"];
445                if allow_delay {
446                    expected.push("DELAY");
447                }
448                return Err(ParseError::expected(expected, self.peek(), self.position()));
449            }
450        }
451        if group.is_empty() && delivery_id.is_none() {
452            let mut expected = vec!["GROUP", "WITH delivery_id"];
453            if allow_delay {
454                expected.push("WITH DELAY");
455            }
456            return Err(ParseError::expected(expected, self.peek(), self.position()));
457        }
458        Ok((group, message_id, delivery_id, delay_ms))
459    }
460
461    /// Parse the optional `PRIORITY <int>`, `DELAY <duration>`, and
462    /// `AVAILABLE AT <unix_ms>` tail of a `QUEUE PUSH` / `RPUSH` / `LPUSH`
463    /// (issue #722). Clauses may appear in any order; each may appear at
464    /// most once. `AVAILABLE AT` and `DELAY` are mutually exclusive — both
465    /// produce a per-message availability instant, so accepting both
466    /// would force the parser to pick a winner silently.
467    fn parse_push_tail_clauses(
468        &mut self,
469    ) -> Result<(Option<i32>, Option<QueueAvailability>), ParseError> {
470        let mut priority: Option<i32> = None;
471        let mut available: Option<QueueAvailability> = None;
472        loop {
473            if self.consume(&Token::Priority)? {
474                if priority.is_some() {
475                    return Err(ParseError::new(
476                        "duplicate PRIORITY clause".to_string(),
477                        self.position(),
478                    ));
479                }
480                priority = Some(self.parse_integer()? as i32);
481            } else if self.peek_ident_ci("DELAY") {
482                self.advance()?;
483                if available.is_some() {
484                    return Err(ParseError::new(
485                        "QUEUE PUSH accepts at most one of DELAY / AVAILABLE AT".to_string(),
486                        self.position(),
487                    ));
488                }
489                let value = self.parse_float()?;
490                let unit = self.parse_queue_duration_unit()?;
491                available = Some(QueueAvailability::DelayMs((value * unit).max(0.0) as u64));
492            } else if self.peek_ident_ci("AVAILABLE") {
493                self.advance()?;
494                if !self.consume_ident_ci("AT")? {
495                    return Err(ParseError::expected(
496                        vec!["AT"],
497                        self.peek(),
498                        self.position(),
499                    ));
500                }
501                if available.is_some() {
502                    return Err(ParseError::new(
503                        "QUEUE PUSH accepts at most one of DELAY / AVAILABLE AT".to_string(),
504                        self.position(),
505                    ));
506                }
507                let unix_ms = self.parse_integer()?.max(0) as u64;
508                available = Some(QueueAvailability::AtUnixMs(unix_ms));
509            } else {
510                break;
511            }
512        }
513        Ok((priority, available))
514    }
515
516    /// Parse an optional `WAIT <duration>` tail (slice A of PRD #718).
517    ///
518    /// Returns `None` if the next token is not the `WAIT` keyword. When
519    /// `WAIT` is present, a duration literal is mandatory — a bare
520    /// `WAIT` with no number errors with a clear "expected: number"
521    /// message bubbled up from `parse_float`.
522    fn parse_optional_wait_clause(&mut self) -> Result<Option<u64>, ParseError> {
523        if !self.peek_ident_ci("WAIT") {
524            return Ok(None);
525        }
526        self.advance()?;
527        let value = self.parse_float()?;
528        let unit = self.parse_queue_duration_unit()?;
529        Ok(Some((value * unit).max(0.0) as u64))
530    }
531
532    /// Reject a stray `WAIT …` tail after a command form that does not
533    /// support blocking reads (`POP`, `PEEK`). The error names the
534    /// command so the operator sees why their statement was refused.
535    fn reject_wait_clause(&mut self, command: &'static str) -> Result<(), ParseError> {
536        if self.peek_ident_ci("WAIT") {
537            return Err(ParseError::new(
538                format!("QUEUE {command} does not support WAIT; use QUEUE READ … WAIT <duration>"),
539                self.position(),
540            ));
541        }
542        Ok(())
543    }
544
545    fn peek_ident_ci(&self, name: &str) -> bool {
546        matches!(self.peek(), Token::Ident(id) if id.eq_ignore_ascii_case(name))
547    }
548
549    fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
550        match self.peek() {
551            Token::Work => {
552                self.advance()?;
553                Ok(Some(QueueMode::Work))
554            }
555            Token::Ident(name) => {
556                if let Some(mode) = QueueMode::parse(name) {
557                    self.advance()?;
558                    Ok(Some(mode))
559                } else {
560                    Ok(None)
561                }
562            }
563            _ => Ok(None),
564        }
565    }
566
567    fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
568        match self.consume_queue_mode()? {
569            Some(mode) => Ok(mode),
570            None => Err(ParseError::expected(
571                vec!["FANOUT", "WORK"],
572                self.peek(),
573                self.position(),
574            )),
575        }
576    }
577
578    /// Parse duration unit for queue TTL
579    fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
580        match self.peek().clone() {
581            Token::Ident(ref unit) => {
582                let mult = match unit.to_ascii_lowercase().as_str() {
583                    "ms" => 1.0,
584                    "s" | "sec" | "secs" => 1_000.0,
585                    "m" | "min" | "mins" => 60_000.0,
586                    "h" | "hr" | "hrs" => 3_600_000.0,
587                    "d" | "day" | "days" => 86_400_000.0,
588                    _ => return Ok(1_000.0),
589                };
590                self.advance()?;
591                Ok(mult)
592            }
593            _ => Ok(1_000.0),
594        }
595    }
596}