Skip to main content

reddb_server/storage/query/parser/
queue.rs

1//! Parser for QUEUE commands and CREATE/DROP QUEUE
2
3use super::super::ast::{
4    AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueCommand, QueueMode,
5    QueueSide, DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP, DEFAULT_QUEUE_LOCK_DEADLINE_MS,
6    DEFAULT_QUEUE_MAX_ATTEMPTS,
7};
8use super::super::lexer::Token;
9use super::error::ParseError;
10use super::Parser;
11
12impl<'a> Parser<'a> {
13    /// Parse CREATE QUEUE body (after CREATE QUEUE consumed)
14    pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
15        let if_not_exists = self.match_if_not_exists()?;
16        let name = self.expect_ident()?;
17
18        let mut mode = QueueMode::Work;
19        let mut priority = false;
20        let mut max_size = None;
21        let mut ttl_ms = None;
22        let mut dlq = None;
23        let mut max_attempts = DEFAULT_QUEUE_MAX_ATTEMPTS;
24        let mut lock_deadline_ms = DEFAULT_QUEUE_LOCK_DEADLINE_MS;
25        let mut in_flight_cap_per_group = DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP;
26
27        // Parse optional clauses in any order
28        loop {
29            if let Some(parsed_mode) = self.consume_queue_mode()? {
30                mode = parsed_mode;
31            } else if self.consume(&Token::Priority)? {
32                priority = true;
33            } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
34                max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
35            } else if self.consume_ident_ci("MAX_ATTEMPTS")?
36                || self.consume_ident_ci("MAXATTEMPTS")?
37            {
38                max_attempts = self.parse_integer()?.max(1) as u32;
39            } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
40                lock_deadline_ms = self.parse_integer()?.max(1) as u64;
41            } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
42                in_flight_cap_per_group = self.parse_integer()?.max(1) as u32;
43            } else if self.consume(&Token::With)? {
44                if self.consume_ident_ci("EVENTS")? {
45                    return Err(ParseError::new(
46                        "queues cannot have event subscriptions".to_string(),
47                        self.position(),
48                    ));
49                } else if self.consume_ident_ci("TTL")? {
50                    let value = self.parse_float()?;
51                    let unit = self.parse_queue_duration_unit()?;
52                    ttl_ms = Some((value * unit) as u64);
53                } else if self.consume_ident_ci("DLQ")? {
54                    dlq = Some(self.expect_ident()?);
55                }
56            } else {
57                break;
58            }
59        }
60
61        Ok(QueryExpr::CreateQueue(CreateQueueQuery {
62            name,
63            mode,
64            priority,
65            max_size,
66            ttl_ms,
67            dlq,
68            max_attempts,
69            lock_deadline_ms,
70            in_flight_cap_per_group,
71            if_not_exists,
72        }))
73    }
74
75    /// Parse ALTER QUEUE body (after ALTER QUEUE consumed)
76    ///
77    /// Syntax: `ALTER QUEUE <name> SET <clause>` where `<clause>` is one of
78    ///   `MODE <WORK|FANOUT>`
79    ///   `MAX_ATTEMPTS <int>`
80    ///   `LOCK_DEADLINE_MS <int>`
81    ///   `IN_FLIGHT_CAP_PER_GROUP <int>`
82    ///   `DLQ <ident>`
83    pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
84        let name = self.expect_ident()?;
85        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
86            return Err(ParseError::expected(
87                vec!["SET"],
88                self.peek(),
89                self.position(),
90            ));
91        }
92
93        let mut alter = AlterQueueQuery {
94            name,
95            ..Default::default()
96        };
97
98        if self.consume(&Token::Mode)? || self.consume_ident_ci("MODE")? {
99            alter.mode = Some(self.parse_queue_mode()?);
100        } else if self.consume_ident_ci("MAX_ATTEMPTS")? || self.consume_ident_ci("MAXATTEMPTS")? {
101            alter.max_attempts = Some(self.parse_integer()?.max(1) as u32);
102        } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
103            alter.lock_deadline_ms = Some(self.parse_integer()?.max(1) as u64);
104        } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
105            alter.in_flight_cap_per_group = Some(self.parse_integer()?.max(1) as u32);
106        } else if self.consume_ident_ci("DLQ")? {
107            alter.dlq = Some(self.expect_ident()?);
108        } else {
109            return Err(ParseError::expected(
110                vec![
111                    "MODE",
112                    "MAX_ATTEMPTS",
113                    "LOCK_DEADLINE_MS",
114                    "IN_FLIGHT_CAP_PER_GROUP",
115                    "DLQ",
116                ],
117                self.peek(),
118                self.position(),
119            ));
120        }
121
122        Ok(QueryExpr::AlterQueue(alter))
123    }
124
125    /// Parse DROP QUEUE body (after DROP QUEUE consumed)
126    pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
127        let if_exists = self.match_if_exists()?;
128        let name = self.parse_drop_collection_name()?;
129        Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
130    }
131
132    /// Parse QUEUE subcommand (after QUEUE token consumed)
133    pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
134        self.expect(Token::Queue)?;
135
136        match self.peek().clone() {
137            Token::Push => {
138                self.advance()?;
139                let queue = self.expect_ident()?;
140                let value = self.parse_value()?;
141                let priority = if self.consume(&Token::Priority)? {
142                    Some(self.parse_integer()? as i32)
143                } else {
144                    None
145                };
146                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
147                    queue,
148                    value,
149                    side: QueueSide::Right,
150                    priority,
151                }))
152            }
153            Token::Pop => {
154                self.advance()?;
155                let queue = self.expect_ident()?;
156                let count = if self.consume(&Token::Count)? {
157                    self.parse_integer()? as usize
158                } else {
159                    1
160                };
161                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
162                    queue,
163                    side: QueueSide::Left,
164                    count,
165                }))
166            }
167            Token::Peek => {
168                self.advance()?;
169                let queue = self.expect_ident()?;
170                // Accept either `PEEK <q> COUNT <n>` or a bare `PEEK <q> <n>`.
171                // `consume` has a side effect (advances past COUNT), so the
172                // short-circuit order matters: only peek for a bare integer
173                // when the COUNT keyword was absent.
174                let count =
175                    if self.consume(&Token::Count)? || matches!(self.peek(), Token::Integer(_)) {
176                        self.parse_integer()? as usize
177                    } else {
178                        1
179                    };
180                Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
181            }
182            Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
183                self.advance()?;
184                let queue = self.expect_ident()?;
185                Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
186            }
187            Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
188                self.advance()?;
189                self.expect(Token::From)?;
190                let source = self.expect_ident()?;
191                self.expect(Token::To)?;
192                let destination = self.expect_ident()?;
193                let filter = if self.consume(&Token::Where)? {
194                    Some(self.parse_filter()?)
195                } else {
196                    None
197                };
198                let limit = if self.consume(&Token::Limit)? {
199                    self.parse_positive_integer("LIMIT")? as usize
200                } else if filter.is_some() {
201                    return Err(ParseError::expected(
202                        vec!["LIMIT"],
203                        self.peek(),
204                        self.position(),
205                    ));
206                } else {
207                    1
208                };
209                Ok(QueryExpr::QueueCommand(QueueCommand::Move {
210                    source,
211                    destination,
212                    filter,
213                    limit,
214                }))
215            }
216            Token::Purge => {
217                self.advance()?;
218                let queue = self.expect_ident()?;
219                Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
220            }
221            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
222                self.advance()?;
223                let queue = self.expect_ident()?;
224                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
225                    queue,
226                    side: QueueSide::Left,
227                    count: 1,
228                }))
229            }
230            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
231                self.advance()?;
232                let queue = self.expect_ident()?;
233                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
234                    queue,
235                    side: QueueSide::Right,
236                    count: 1,
237                }))
238            }
239            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
240                self.advance()?;
241                let queue = self.expect_ident()?;
242                let value = self.parse_value()?;
243                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
244                    queue,
245                    value,
246                    side: QueueSide::Left,
247                    priority: None,
248                }))
249            }
250            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
251                self.advance()?;
252                let queue = self.expect_ident()?;
253                let value = self.parse_value()?;
254                let priority = if self.consume(&Token::Priority)? {
255                    Some(self.parse_integer()? as i32)
256                } else {
257                    None
258                };
259                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
260                    queue,
261                    value,
262                    side: QueueSide::Right,
263                    priority,
264                }))
265            }
266            Token::Group => {
267                self.advance()?;
268                self.expect(Token::Create)?;
269                let queue = self.expect_ident()?;
270                let group = self.expect_ident()?;
271                Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
272                    queue,
273                    group,
274                }))
275            }
276            Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
277                self.advance()?;
278                let queue = self.expect_ident()?;
279                let group = if self.consume(&Token::Group)? {
280                    Some(self.expect_ident()?)
281                } else {
282                    None
283                };
284                // CONSUMER consumer_name
285                if !self.consume_ident_ci("CONSUMER")? {
286                    return Err(ParseError::expected(
287                        vec!["CONSUMER"],
288                        self.peek(),
289                        self.position(),
290                    ));
291                }
292                let consumer = self.expect_ident()?;
293                let count = if self.consume(&Token::Count)? {
294                    self.parse_integer()? as usize
295                } else {
296                    1
297                };
298                Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
299                    queue,
300                    group,
301                    consumer,
302                    count,
303                }))
304            }
305            Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
306                self.advance()?;
307                let queue = self.expect_ident()?;
308                self.expect(Token::Group)?;
309                let group = self.expect_ident()?;
310                Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
311                    queue,
312                    group,
313                }))
314            }
315            Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
316                self.advance()?;
317                let queue = self.expect_ident()?;
318                self.expect(Token::Group)?;
319                let group = self.expect_ident()?;
320                if !self.consume_ident_ci("CONSUMER")? {
321                    return Err(ParseError::expected(
322                        vec!["CONSUMER"],
323                        self.peek(),
324                        self.position(),
325                    ));
326                }
327                let consumer = self.expect_ident()?;
328                if !self.consume_ident_ci("MIN_IDLE")? {
329                    return Err(ParseError::expected(
330                        vec!["MIN_IDLE"],
331                        self.peek(),
332                        self.position(),
333                    ));
334                }
335                let min_idle_ms = self.parse_integer()?.max(0) as u64;
336                Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
337                    queue,
338                    group,
339                    consumer,
340                    min_idle_ms,
341                }))
342            }
343            Token::Ack => {
344                self.advance()?;
345                let queue = self.expect_ident()?;
346                let (group, message_id, delivery_id) = self.parse_ack_nack_handle()?;
347                Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
348                    queue,
349                    group,
350                    message_id,
351                    delivery_id,
352                }))
353            }
354            Token::Nack => {
355                self.advance()?;
356                let queue = self.expect_ident()?;
357                let (group, message_id, delivery_id) = self.parse_ack_nack_handle()?;
358                Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
359                    queue,
360                    group,
361                    message_id,
362                    delivery_id,
363                }))
364            }
365            _ => Err(ParseError::expected(
366                vec![
367                    "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
368                    "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
369                ],
370                self.peek(),
371                self.position(),
372            )),
373        }
374    }
375
376    /// Parse the tuple/`delivery_id` handle that follows `ACK <queue>` /
377    /// `NACK <queue>`. Returns `(group, message_id, delivery_id)`:
378    ///
379    /// - `GROUP <group> '<message_id>'`               → legacy tuple, no delivery_id
380    /// - `GROUP <group> '<message_id>' WITH delivery_id = '<base32>'` → both, delivery_id wins (ADR 0026)
381    /// - `WITH delivery_id = '<base32>'`              → delivery_id only; group / message_id empty
382    ///
383    /// Refusing both is a parse error — at least one handle is required.
384    fn parse_ack_nack_handle(&mut self) -> Result<(String, String, Option<String>), ParseError> {
385        let (group, message_id) = if matches!(self.peek(), Token::Group) {
386            self.advance()?;
387            let group = self.expect_ident()?;
388            let message_id = self.parse_string()?;
389            (group, message_id)
390        } else {
391            (String::new(), String::new())
392        };
393        let delivery_id = if self.consume(&Token::With)? {
394            if !self.consume_ident_ci("delivery_id")? {
395                return Err(ParseError::expected(
396                    vec!["delivery_id"],
397                    self.peek(),
398                    self.position(),
399                ));
400            }
401            if !self.consume(&Token::Eq)? {
402                return Err(ParseError::expected(
403                    vec!["="],
404                    self.peek(),
405                    self.position(),
406                ));
407            }
408            Some(self.parse_string()?)
409        } else {
410            None
411        };
412        if group.is_empty() && delivery_id.is_none() {
413            return Err(ParseError::expected(
414                vec!["GROUP", "WITH delivery_id"],
415                self.peek(),
416                self.position(),
417            ));
418        }
419        Ok((group, message_id, delivery_id))
420    }
421
422    fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
423        match self.peek() {
424            Token::Work => {
425                self.advance()?;
426                Ok(Some(QueueMode::Work))
427            }
428            Token::Ident(name) => {
429                if let Some(mode) = QueueMode::parse(name) {
430                    self.advance()?;
431                    Ok(Some(mode))
432                } else {
433                    Ok(None)
434                }
435            }
436            _ => Ok(None),
437        }
438    }
439
440    fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
441        match self.consume_queue_mode()? {
442            Some(mode) => Ok(mode),
443            None => Err(ParseError::expected(
444                vec!["FANOUT", "WORK"],
445                self.peek(),
446                self.position(),
447            )),
448        }
449    }
450
451    /// Parse duration unit for queue TTL
452    fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
453        match self.peek().clone() {
454            Token::Ident(ref unit) => {
455                let mult = match unit.to_ascii_lowercase().as_str() {
456                    "ms" => 1.0,
457                    "s" | "sec" | "secs" => 1_000.0,
458                    "m" | "min" | "mins" => 60_000.0,
459                    "h" | "hr" | "hrs" => 3_600_000.0,
460                    "d" | "day" | "days" => 86_400_000.0,
461                    _ => return Ok(1_000.0),
462                };
463                self.advance()?;
464                Ok(mult)
465            }
466            _ => Ok(1_000.0),
467        }
468    }
469}