Skip to main content

reddb_server/storage/query/parser/
queue.rs

1//! Parser for QUEUE commands and CREATE/DROP QUEUE
2
3use super::super::ast::{
4    AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueCommand, QueueMode,
5    QueueSide, DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP, DEFAULT_QUEUE_LOCK_DEADLINE_MS,
6    DEFAULT_QUEUE_MAX_ATTEMPTS,
7};
8use super::super::lexer::Token;
9use super::error::ParseError;
10use super::Parser;
11
12impl<'a> Parser<'a> {
13    /// Parse CREATE QUEUE body (after CREATE QUEUE consumed)
14    pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
15        let if_not_exists = self.match_if_not_exists()?;
16        let name = self.expect_ident()?;
17
18        let mut mode = QueueMode::Work;
19        let mut priority = false;
20        let mut max_size = None;
21        let mut ttl_ms = None;
22        let mut dlq = None;
23        let mut max_attempts = DEFAULT_QUEUE_MAX_ATTEMPTS;
24        let mut lock_deadline_ms = DEFAULT_QUEUE_LOCK_DEADLINE_MS;
25        let mut in_flight_cap_per_group = DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP;
26
27        // Parse optional clauses in any order
28        loop {
29            if let Some(parsed_mode) = self.consume_queue_mode()? {
30                mode = parsed_mode;
31            } else if self.consume(&Token::Priority)? {
32                priority = true;
33            } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
34                max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
35            } else if self.consume_ident_ci("MAX_ATTEMPTS")?
36                || self.consume_ident_ci("MAXATTEMPTS")?
37            {
38                max_attempts = self.parse_integer()?.max(1) as u32;
39            } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
40                lock_deadline_ms = self.parse_integer()?.max(1) as u64;
41            } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
42                in_flight_cap_per_group = self.parse_integer()?.max(1) as u32;
43            } else if self.consume(&Token::With)? {
44                if self.consume_ident_ci("EVENTS")? {
45                    return Err(ParseError::new(
46                        "queues cannot have event subscriptions".to_string(),
47                        self.position(),
48                    ));
49                } else if self.consume_ident_ci("TTL")? {
50                    let value = self.parse_float()?;
51                    let unit = self.parse_queue_duration_unit()?;
52                    ttl_ms = Some((value * unit) as u64);
53                } else if self.consume_ident_ci("DLQ")? {
54                    dlq = Some(self.expect_ident()?);
55                }
56            } else {
57                break;
58            }
59        }
60
61        Ok(QueryExpr::CreateQueue(CreateQueueQuery {
62            name,
63            mode,
64            priority,
65            max_size,
66            ttl_ms,
67            dlq,
68            max_attempts,
69            lock_deadline_ms,
70            in_flight_cap_per_group,
71            if_not_exists,
72        }))
73    }
74
75    /// Parse ALTER QUEUE body (after ALTER QUEUE consumed)
76    ///
77    /// Syntax: `ALTER QUEUE <name> SET <clause>` where `<clause>` is one of
78    ///   `MODE <WORK|FANOUT>`
79    ///   `MAX_ATTEMPTS <int>`
80    ///   `LOCK_DEADLINE_MS <int>`
81    ///   `IN_FLIGHT_CAP_PER_GROUP <int>`
82    ///   `DLQ <ident>`
83    pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
84        let name = self.expect_ident()?;
85        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
86            return Err(ParseError::expected(
87                vec!["SET"],
88                self.peek(),
89                self.position(),
90            ));
91        }
92
93        let mut alter = AlterQueueQuery {
94            name,
95            ..Default::default()
96        };
97
98        if self.consume(&Token::Mode)? || self.consume_ident_ci("MODE")? {
99            alter.mode = Some(self.parse_queue_mode()?);
100        } else if self.consume_ident_ci("MAX_ATTEMPTS")?
101            || self.consume_ident_ci("MAXATTEMPTS")?
102        {
103            alter.max_attempts = Some(self.parse_integer()?.max(1) as u32);
104        } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
105            alter.lock_deadline_ms = Some(self.parse_integer()?.max(1) as u64);
106        } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
107            alter.in_flight_cap_per_group = Some(self.parse_integer()?.max(1) as u32);
108        } else if self.consume_ident_ci("DLQ")? {
109            alter.dlq = Some(self.expect_ident()?);
110        } else {
111            return Err(ParseError::expected(
112                vec![
113                    "MODE",
114                    "MAX_ATTEMPTS",
115                    "LOCK_DEADLINE_MS",
116                    "IN_FLIGHT_CAP_PER_GROUP",
117                    "DLQ",
118                ],
119                self.peek(),
120                self.position(),
121            ));
122        }
123
124        Ok(QueryExpr::AlterQueue(alter))
125    }
126
127    /// Parse DROP QUEUE body (after DROP QUEUE consumed)
128    pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
129        let if_exists = self.match_if_exists()?;
130        let name = self.parse_drop_collection_name()?;
131        Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
132    }
133
134    /// Parse QUEUE subcommand (after QUEUE token consumed)
135    pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
136        self.expect(Token::Queue)?;
137
138        match self.peek().clone() {
139            Token::Push => {
140                self.advance()?;
141                let queue = self.expect_ident()?;
142                let value = self.parse_value()?;
143                let priority = if self.consume(&Token::Priority)? {
144                    Some(self.parse_integer()? as i32)
145                } else {
146                    None
147                };
148                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
149                    queue,
150                    value,
151                    side: QueueSide::Right,
152                    priority,
153                }))
154            }
155            Token::Pop => {
156                self.advance()?;
157                let queue = self.expect_ident()?;
158                let count = if self.consume(&Token::Count)? {
159                    self.parse_integer()? as usize
160                } else {
161                    1
162                };
163                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
164                    queue,
165                    side: QueueSide::Left,
166                    count,
167                }))
168            }
169            Token::Peek => {
170                self.advance()?;
171                let queue = self.expect_ident()?;
172                let count = if self.consume(&Token::Count)? {
173                    self.parse_integer()? as usize
174                } else if matches!(self.peek(), Token::Integer(_)) {
175                    self.parse_integer()? as usize
176                } else {
177                    1
178                };
179                Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
180            }
181            Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
182                self.advance()?;
183                let queue = self.expect_ident()?;
184                Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
185            }
186            Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
187                self.advance()?;
188                self.expect(Token::From)?;
189                let source = self.expect_ident()?;
190                self.expect(Token::To)?;
191                let destination = self.expect_ident()?;
192                let filter = if self.consume(&Token::Where)? {
193                    Some(self.parse_filter()?)
194                } else {
195                    None
196                };
197                let limit = if self.consume(&Token::Limit)? {
198                    self.parse_positive_integer("LIMIT")? as usize
199                } else if filter.is_some() {
200                    return Err(ParseError::expected(
201                        vec!["LIMIT"],
202                        self.peek(),
203                        self.position(),
204                    ));
205                } else {
206                    1
207                };
208                Ok(QueryExpr::QueueCommand(QueueCommand::Move {
209                    source,
210                    destination,
211                    filter,
212                    limit,
213                }))
214            }
215            Token::Purge => {
216                self.advance()?;
217                let queue = self.expect_ident()?;
218                Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
219            }
220            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
221                self.advance()?;
222                let queue = self.expect_ident()?;
223                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
224                    queue,
225                    side: QueueSide::Left,
226                    count: 1,
227                }))
228            }
229            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
230                self.advance()?;
231                let queue = self.expect_ident()?;
232                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
233                    queue,
234                    side: QueueSide::Right,
235                    count: 1,
236                }))
237            }
238            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
239                self.advance()?;
240                let queue = self.expect_ident()?;
241                let value = self.parse_value()?;
242                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
243                    queue,
244                    value,
245                    side: QueueSide::Left,
246                    priority: None,
247                }))
248            }
249            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
250                self.advance()?;
251                let queue = self.expect_ident()?;
252                let value = self.parse_value()?;
253                let priority = if self.consume(&Token::Priority)? {
254                    Some(self.parse_integer()? as i32)
255                } else {
256                    None
257                };
258                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
259                    queue,
260                    value,
261                    side: QueueSide::Right,
262                    priority,
263                }))
264            }
265            Token::Group => {
266                self.advance()?;
267                self.expect(Token::Create)?;
268                let queue = self.expect_ident()?;
269                let group = self.expect_ident()?;
270                Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
271                    queue,
272                    group,
273                }))
274            }
275            Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
276                self.advance()?;
277                let queue = self.expect_ident()?;
278                let group = if self.consume(&Token::Group)? {
279                    Some(self.expect_ident()?)
280                } else {
281                    None
282                };
283                // CONSUMER consumer_name
284                if !self.consume_ident_ci("CONSUMER")? {
285                    return Err(ParseError::expected(
286                        vec!["CONSUMER"],
287                        self.peek(),
288                        self.position(),
289                    ));
290                }
291                let consumer = self.expect_ident()?;
292                let count = if self.consume(&Token::Count)? {
293                    self.parse_integer()? as usize
294                } else {
295                    1
296                };
297                Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
298                    queue,
299                    group,
300                    consumer,
301                    count,
302                }))
303            }
304            Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
305                self.advance()?;
306                let queue = self.expect_ident()?;
307                self.expect(Token::Group)?;
308                let group = self.expect_ident()?;
309                Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
310                    queue,
311                    group,
312                }))
313            }
314            Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
315                self.advance()?;
316                let queue = self.expect_ident()?;
317                self.expect(Token::Group)?;
318                let group = self.expect_ident()?;
319                if !self.consume_ident_ci("CONSUMER")? {
320                    return Err(ParseError::expected(
321                        vec!["CONSUMER"],
322                        self.peek(),
323                        self.position(),
324                    ));
325                }
326                let consumer = self.expect_ident()?;
327                if !self.consume_ident_ci("MIN_IDLE")? {
328                    return Err(ParseError::expected(
329                        vec!["MIN_IDLE"],
330                        self.peek(),
331                        self.position(),
332                    ));
333                }
334                let min_idle_ms = self.parse_integer()?.max(0) as u64;
335                Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
336                    queue,
337                    group,
338                    consumer,
339                    min_idle_ms,
340                }))
341            }
342            Token::Ack => {
343                self.advance()?;
344                let queue = self.expect_ident()?;
345                self.expect(Token::Group)?;
346                let group = self.expect_ident()?;
347                let message_id = self.parse_string()?;
348                Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
349                    queue,
350                    group,
351                    message_id,
352                }))
353            }
354            Token::Nack => {
355                self.advance()?;
356                let queue = self.expect_ident()?;
357                self.expect(Token::Group)?;
358                let group = self.expect_ident()?;
359                let message_id = self.parse_string()?;
360                Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
361                    queue,
362                    group,
363                    message_id,
364                }))
365            }
366            _ => Err(ParseError::expected(
367                vec![
368                    "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
369                    "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
370                ],
371                self.peek(),
372                self.position(),
373            )),
374        }
375    }
376
377    fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
378        match self.peek() {
379            Token::Work => {
380                self.advance()?;
381                Ok(Some(QueueMode::Work))
382            }
383            Token::Ident(name) => {
384                if let Some(mode) = QueueMode::parse(name) {
385                    self.advance()?;
386                    Ok(Some(mode))
387                } else {
388                    Ok(None)
389                }
390            }
391            _ => Ok(None),
392        }
393    }
394
395    fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
396        match self.consume_queue_mode()? {
397            Some(mode) => Ok(mode),
398            None => Err(ParseError::expected(
399                vec!["FANOUT", "WORK"],
400                self.peek(),
401                self.position(),
402            )),
403        }
404    }
405
406    /// Parse duration unit for queue TTL
407    fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
408        match self.peek().clone() {
409            Token::Ident(ref unit) => {
410                let mult = match unit.to_ascii_lowercase().as_str() {
411                    "ms" => 1.0,
412                    "s" | "sec" | "secs" => 1_000.0,
413                    "m" | "min" | "mins" => 60_000.0,
414                    "h" | "hr" | "hrs" => 3_600_000.0,
415                    "d" | "day" | "days" => 86_400_000.0,
416                    _ => return Ok(1_000.0),
417                };
418                self.advance()?;
419                Ok(mult)
420            }
421            _ => Ok(1_000.0),
422        }
423    }
424}