Skip to main content

reddb_server/storage/query/parser/
queue.rs

1//! Parser for QUEUE commands and CREATE/DROP QUEUE
2
3use super::super::ast::{
4    AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueCommand, QueueMode,
5    QueueSide, DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP, DEFAULT_QUEUE_LOCK_DEADLINE_MS,
6    DEFAULT_QUEUE_MAX_ATTEMPTS,
7};
8use super::super::lexer::Token;
9use super::error::ParseError;
10use super::Parser;
11
12impl<'a> Parser<'a> {
13    /// Parse CREATE QUEUE body (after CREATE QUEUE consumed)
14    pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
15        let if_not_exists = self.match_if_not_exists()?;
16        let name = self.expect_ident()?;
17
18        let mut mode = QueueMode::Work;
19        let mut priority = false;
20        let mut max_size = None;
21        let mut ttl_ms = None;
22        let mut dlq = None;
23        let mut max_attempts = DEFAULT_QUEUE_MAX_ATTEMPTS;
24        let mut lock_deadline_ms = DEFAULT_QUEUE_LOCK_DEADLINE_MS;
25        let mut in_flight_cap_per_group = DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP;
26
27        // Parse optional clauses in any order
28        loop {
29            if let Some(parsed_mode) = self.consume_queue_mode()? {
30                mode = parsed_mode;
31            } else if self.consume(&Token::Priority)? {
32                priority = true;
33            } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
34                max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
35            } else if self.consume_ident_ci("MAX_ATTEMPTS")?
36                || self.consume_ident_ci("MAXATTEMPTS")?
37            {
38                max_attempts = self.parse_integer()?.max(1) as u32;
39            } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
40                lock_deadline_ms = self.parse_integer()?.max(1) as u64;
41            } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
42                in_flight_cap_per_group = self.parse_integer()?.max(1) as u32;
43            } else if self.consume(&Token::With)? {
44                if self.consume_ident_ci("EVENTS")? {
45                    return Err(ParseError::new(
46                        "queues cannot have event subscriptions".to_string(),
47                        self.position(),
48                    ));
49                } else if self.consume_ident_ci("TTL")? {
50                    let value = self.parse_float()?;
51                    let unit = self.parse_queue_duration_unit()?;
52                    ttl_ms = Some((value * unit) as u64);
53                } else if self.consume_ident_ci("DLQ")? {
54                    dlq = Some(self.expect_ident()?);
55                }
56            } else {
57                break;
58            }
59        }
60
61        Ok(QueryExpr::CreateQueue(CreateQueueQuery {
62            name,
63            mode,
64            priority,
65            max_size,
66            ttl_ms,
67            dlq,
68            max_attempts,
69            lock_deadline_ms,
70            in_flight_cap_per_group,
71            if_not_exists,
72        }))
73    }
74
75    /// Parse ALTER QUEUE body (after ALTER QUEUE consumed)
76    ///
77    /// Syntax: `ALTER QUEUE <name> SET <clause>` where `<clause>` is one of
78    ///   `MODE <WORK|FANOUT>`
79    ///   `MAX_ATTEMPTS <int>`
80    ///   `LOCK_DEADLINE_MS <int>`
81    ///   `IN_FLIGHT_CAP_PER_GROUP <int>`
82    ///   `DLQ <ident>`
83    pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
84        let name = self.expect_ident()?;
85        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
86            return Err(ParseError::expected(
87                vec!["SET"],
88                self.peek(),
89                self.position(),
90            ));
91        }
92
93        let mut alter = AlterQueueQuery {
94            name,
95            ..Default::default()
96        };
97
98        if self.consume(&Token::Mode)? || self.consume_ident_ci("MODE")? {
99            alter.mode = Some(self.parse_queue_mode()?);
100        } else if self.consume_ident_ci("MAX_ATTEMPTS")? || self.consume_ident_ci("MAXATTEMPTS")? {
101            alter.max_attempts = Some(self.parse_integer()?.max(1) as u32);
102        } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
103            alter.lock_deadline_ms = Some(self.parse_integer()?.max(1) as u64);
104        } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
105            alter.in_flight_cap_per_group = Some(self.parse_integer()?.max(1) as u32);
106        } else if self.consume_ident_ci("DLQ")? {
107            alter.dlq = Some(self.expect_ident()?);
108        } else {
109            return Err(ParseError::expected(
110                vec![
111                    "MODE",
112                    "MAX_ATTEMPTS",
113                    "LOCK_DEADLINE_MS",
114                    "IN_FLIGHT_CAP_PER_GROUP",
115                    "DLQ",
116                ],
117                self.peek(),
118                self.position(),
119            ));
120        }
121
122        Ok(QueryExpr::AlterQueue(alter))
123    }
124
125    /// Parse DROP QUEUE body (after DROP QUEUE consumed)
126    pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
127        let if_exists = self.match_if_exists()?;
128        let name = self.parse_drop_collection_name()?;
129        Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
130    }
131
132    /// Parse QUEUE subcommand (after QUEUE token consumed)
133    pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
134        self.expect(Token::Queue)?;
135
136        match self.peek().clone() {
137            Token::Push => {
138                self.advance()?;
139                let queue = self.expect_ident()?;
140                let value = self.parse_value()?;
141                let priority = if self.consume(&Token::Priority)? {
142                    Some(self.parse_integer()? as i32)
143                } else {
144                    None
145                };
146                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
147                    queue,
148                    value,
149                    side: QueueSide::Right,
150                    priority,
151                }))
152            }
153            Token::Pop => {
154                self.advance()?;
155                let queue = self.expect_ident()?;
156                let count = if self.consume(&Token::Count)? {
157                    self.parse_integer()? as usize
158                } else {
159                    1
160                };
161                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
162                    queue,
163                    side: QueueSide::Left,
164                    count,
165                }))
166            }
167            Token::Peek => {
168                self.advance()?;
169                let queue = self.expect_ident()?;
170                // Accept either `PEEK <q> COUNT <n>` or a bare `PEEK <q> <n>`.
171                // `consume` has a side effect (advances past COUNT), so the
172                // short-circuit order matters: only peek for a bare integer
173                // when the COUNT keyword was absent.
174                let count =
175                    if self.consume(&Token::Count)? || matches!(self.peek(), Token::Integer(_)) {
176                        self.parse_integer()? as usize
177                    } else {
178                        1
179                    };
180                Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
181            }
182            Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
183                self.advance()?;
184                let queue = self.expect_ident()?;
185                Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
186            }
187            Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
188                self.advance()?;
189                self.expect(Token::From)?;
190                let source = self.expect_ident()?;
191                self.expect(Token::To)?;
192                let destination = self.expect_ident()?;
193                let filter = if self.consume(&Token::Where)? {
194                    Some(self.parse_filter()?)
195                } else {
196                    None
197                };
198                let limit = if self.consume(&Token::Limit)? {
199                    self.parse_positive_integer("LIMIT")? as usize
200                } else if filter.is_some() {
201                    return Err(ParseError::expected(
202                        vec!["LIMIT"],
203                        self.peek(),
204                        self.position(),
205                    ));
206                } else {
207                    1
208                };
209                Ok(QueryExpr::QueueCommand(QueueCommand::Move {
210                    source,
211                    destination,
212                    filter,
213                    limit,
214                }))
215            }
216            Token::Purge => {
217                self.advance()?;
218                let queue = self.expect_ident()?;
219                Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
220            }
221            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
222                self.advance()?;
223                let queue = self.expect_ident()?;
224                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
225                    queue,
226                    side: QueueSide::Left,
227                    count: 1,
228                }))
229            }
230            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
231                self.advance()?;
232                let queue = self.expect_ident()?;
233                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
234                    queue,
235                    side: QueueSide::Right,
236                    count: 1,
237                }))
238            }
239            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
240                self.advance()?;
241                let queue = self.expect_ident()?;
242                let value = self.parse_value()?;
243                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
244                    queue,
245                    value,
246                    side: QueueSide::Left,
247                    priority: None,
248                }))
249            }
250            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
251                self.advance()?;
252                let queue = self.expect_ident()?;
253                let value = self.parse_value()?;
254                let priority = if self.consume(&Token::Priority)? {
255                    Some(self.parse_integer()? as i32)
256                } else {
257                    None
258                };
259                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
260                    queue,
261                    value,
262                    side: QueueSide::Right,
263                    priority,
264                }))
265            }
266            Token::Group => {
267                self.advance()?;
268                self.expect(Token::Create)?;
269                let queue = self.expect_ident()?;
270                let group = self.expect_ident()?;
271                Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
272                    queue,
273                    group,
274                }))
275            }
276            Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
277                self.advance()?;
278                let queue = self.expect_ident()?;
279                let group = if self.consume(&Token::Group)? {
280                    Some(self.expect_ident()?)
281                } else {
282                    None
283                };
284                // CONSUMER consumer_name
285                if !self.consume_ident_ci("CONSUMER")? {
286                    return Err(ParseError::expected(
287                        vec!["CONSUMER"],
288                        self.peek(),
289                        self.position(),
290                    ));
291                }
292                let consumer = self.expect_ident()?;
293                let count = if self.consume(&Token::Count)? {
294                    self.parse_integer()? as usize
295                } else {
296                    1
297                };
298                Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
299                    queue,
300                    group,
301                    consumer,
302                    count,
303                }))
304            }
305            Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
306                self.advance()?;
307                let queue = self.expect_ident()?;
308                self.expect(Token::Group)?;
309                let group = self.expect_ident()?;
310                Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
311                    queue,
312                    group,
313                }))
314            }
315            Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
316                self.advance()?;
317                let queue = self.expect_ident()?;
318                self.expect(Token::Group)?;
319                let group = self.expect_ident()?;
320                if !self.consume_ident_ci("CONSUMER")? {
321                    return Err(ParseError::expected(
322                        vec!["CONSUMER"],
323                        self.peek(),
324                        self.position(),
325                    ));
326                }
327                let consumer = self.expect_ident()?;
328                if !self.consume_ident_ci("MIN_IDLE")? {
329                    return Err(ParseError::expected(
330                        vec!["MIN_IDLE"],
331                        self.peek(),
332                        self.position(),
333                    ));
334                }
335                let min_idle_ms = self.parse_integer()?.max(0) as u64;
336                Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
337                    queue,
338                    group,
339                    consumer,
340                    min_idle_ms,
341                }))
342            }
343            Token::Ack => {
344                self.advance()?;
345                let queue = self.expect_ident()?;
346                self.expect(Token::Group)?;
347                let group = self.expect_ident()?;
348                let message_id = self.parse_string()?;
349                Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
350                    queue,
351                    group,
352                    message_id,
353                }))
354            }
355            Token::Nack => {
356                self.advance()?;
357                let queue = self.expect_ident()?;
358                self.expect(Token::Group)?;
359                let group = self.expect_ident()?;
360                let message_id = self.parse_string()?;
361                Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
362                    queue,
363                    group,
364                    message_id,
365                }))
366            }
367            _ => Err(ParseError::expected(
368                vec![
369                    "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
370                    "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
371                ],
372                self.peek(),
373                self.position(),
374            )),
375        }
376    }
377
378    fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
379        match self.peek() {
380            Token::Work => {
381                self.advance()?;
382                Ok(Some(QueueMode::Work))
383            }
384            Token::Ident(name) => {
385                if let Some(mode) = QueueMode::parse(name) {
386                    self.advance()?;
387                    Ok(Some(mode))
388                } else {
389                    Ok(None)
390                }
391            }
392            _ => Ok(None),
393        }
394    }
395
396    fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
397        match self.consume_queue_mode()? {
398            Some(mode) => Ok(mode),
399            None => Err(ParseError::expected(
400                vec!["FANOUT", "WORK"],
401                self.peek(),
402                self.position(),
403            )),
404        }
405    }
406
407    /// Parse duration unit for queue TTL
408    fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
409        match self.peek().clone() {
410            Token::Ident(ref unit) => {
411                let mult = match unit.to_ascii_lowercase().as_str() {
412                    "ms" => 1.0,
413                    "s" | "sec" | "secs" => 1_000.0,
414                    "m" | "min" | "mins" => 60_000.0,
415                    "h" | "hr" | "hrs" => 3_600_000.0,
416                    "d" | "day" | "days" => 86_400_000.0,
417                    _ => return Ok(1_000.0),
418                };
419                self.advance()?;
420                Ok(mult)
421            }
422            _ => Ok(1_000.0),
423        }
424    }
425}