Skip to main content

reddb_server/storage/query/parser/
queue.rs

1//! Parser for QUEUE commands and CREATE/DROP QUEUE
2
3use super::super::ast::{
4    AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueCommand, QueueMode,
5    QueueSide,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10
11impl<'a> Parser<'a> {
12    /// Parse CREATE QUEUE body (after CREATE QUEUE consumed)
13    pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
14        let if_not_exists = self.match_if_not_exists()?;
15        let name = self.expect_ident()?;
16
17        let mut mode = QueueMode::Work;
18        let mut priority = false;
19        let mut max_size = None;
20        let mut ttl_ms = None;
21        let mut dlq = None;
22        let mut max_attempts = 3u32;
23
24        // Parse optional clauses in any order
25        loop {
26            if let Some(parsed_mode) = self.consume_queue_mode()? {
27                mode = parsed_mode;
28            } else if self.consume(&Token::Priority)? {
29                priority = true;
30            } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
31                max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
32            } else if self.consume_ident_ci("MAX_ATTEMPTS")?
33                || self.consume_ident_ci("MAXATTEMPTS")?
34            {
35                max_attempts = self.parse_integer()?.max(1) as u32;
36            } else if self.consume(&Token::With)? {
37                if self.consume_ident_ci("EVENTS")? {
38                    return Err(ParseError::new(
39                        "queues cannot have event subscriptions".to_string(),
40                        self.position(),
41                    ));
42                } else if self.consume_ident_ci("TTL")? {
43                    let value = self.parse_float()?;
44                    let unit = self.parse_queue_duration_unit()?;
45                    ttl_ms = Some((value * unit) as u64);
46                } else if self.consume_ident_ci("DLQ")? {
47                    dlq = Some(self.expect_ident()?);
48                }
49            } else {
50                break;
51            }
52        }
53
54        Ok(QueryExpr::CreateQueue(CreateQueueQuery {
55            name,
56            mode,
57            priority,
58            max_size,
59            ttl_ms,
60            dlq,
61            max_attempts,
62            if_not_exists,
63        }))
64    }
65
66    /// Parse ALTER QUEUE body (after ALTER QUEUE consumed)
67    pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
68        let name = self.expect_ident()?;
69        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
70            return Err(ParseError::expected(
71                vec!["SET"],
72                self.peek(),
73                self.position(),
74            ));
75        }
76        if !self.consume(&Token::Mode)? && !self.consume_ident_ci("MODE")? {
77            return Err(ParseError::expected(
78                vec!["MODE"],
79                self.peek(),
80                self.position(),
81            ));
82        }
83        let mode = self.parse_queue_mode()?;
84        Ok(QueryExpr::AlterQueue(AlterQueueQuery { name, mode }))
85    }
86
87    /// Parse DROP QUEUE body (after DROP QUEUE consumed)
88    pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
89        let if_exists = self.match_if_exists()?;
90        let name = self.parse_drop_collection_name()?;
91        Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
92    }
93
94    /// Parse QUEUE subcommand (after QUEUE token consumed)
95    pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
96        self.expect(Token::Queue)?;
97
98        match self.peek().clone() {
99            Token::Push => {
100                self.advance()?;
101                let queue = self.expect_ident()?;
102                let value = self.parse_value()?;
103                let priority = if self.consume(&Token::Priority)? {
104                    Some(self.parse_integer()? as i32)
105                } else {
106                    None
107                };
108                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
109                    queue,
110                    value,
111                    side: QueueSide::Right,
112                    priority,
113                }))
114            }
115            Token::Pop => {
116                self.advance()?;
117                let queue = self.expect_ident()?;
118                let count = if self.consume(&Token::Count)? {
119                    self.parse_integer()? as usize
120                } else {
121                    1
122                };
123                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
124                    queue,
125                    side: QueueSide::Left,
126                    count,
127                }))
128            }
129            Token::Peek => {
130                self.advance()?;
131                let queue = self.expect_ident()?;
132                let count = if self.consume(&Token::Count)? {
133                    self.parse_integer()? as usize
134                } else if matches!(self.peek(), Token::Integer(_)) {
135                    self.parse_integer()? as usize
136                } else {
137                    1
138                };
139                Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
140            }
141            Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
142                self.advance()?;
143                let queue = self.expect_ident()?;
144                Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
145            }
146            Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
147                self.advance()?;
148                self.expect(Token::From)?;
149                let source = self.expect_ident()?;
150                self.expect(Token::To)?;
151                let destination = self.expect_ident()?;
152                let filter = if self.consume(&Token::Where)? {
153                    Some(self.parse_filter()?)
154                } else {
155                    None
156                };
157                let limit = if self.consume(&Token::Limit)? {
158                    self.parse_positive_integer("LIMIT")? as usize
159                } else if filter.is_some() {
160                    return Err(ParseError::expected(
161                        vec!["LIMIT"],
162                        self.peek(),
163                        self.position(),
164                    ));
165                } else {
166                    1
167                };
168                Ok(QueryExpr::QueueCommand(QueueCommand::Move {
169                    source,
170                    destination,
171                    filter,
172                    limit,
173                }))
174            }
175            Token::Purge => {
176                self.advance()?;
177                let queue = self.expect_ident()?;
178                Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
179            }
180            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
181                self.advance()?;
182                let queue = self.expect_ident()?;
183                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
184                    queue,
185                    side: QueueSide::Left,
186                    count: 1,
187                }))
188            }
189            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
190                self.advance()?;
191                let queue = self.expect_ident()?;
192                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
193                    queue,
194                    side: QueueSide::Right,
195                    count: 1,
196                }))
197            }
198            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
199                self.advance()?;
200                let queue = self.expect_ident()?;
201                let value = self.parse_value()?;
202                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
203                    queue,
204                    value,
205                    side: QueueSide::Left,
206                    priority: None,
207                }))
208            }
209            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
210                self.advance()?;
211                let queue = self.expect_ident()?;
212                let value = self.parse_value()?;
213                let priority = if self.consume(&Token::Priority)? {
214                    Some(self.parse_integer()? as i32)
215                } else {
216                    None
217                };
218                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
219                    queue,
220                    value,
221                    side: QueueSide::Right,
222                    priority,
223                }))
224            }
225            Token::Group => {
226                self.advance()?;
227                self.expect(Token::Create)?;
228                let queue = self.expect_ident()?;
229                let group = self.expect_ident()?;
230                Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
231                    queue,
232                    group,
233                }))
234            }
235            Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
236                self.advance()?;
237                let queue = self.expect_ident()?;
238                let group = if self.consume(&Token::Group)? {
239                    Some(self.expect_ident()?)
240                } else {
241                    None
242                };
243                // CONSUMER consumer_name
244                if !self.consume_ident_ci("CONSUMER")? {
245                    return Err(ParseError::expected(
246                        vec!["CONSUMER"],
247                        self.peek(),
248                        self.position(),
249                    ));
250                }
251                let consumer = self.expect_ident()?;
252                let count = if self.consume(&Token::Count)? {
253                    self.parse_integer()? as usize
254                } else {
255                    1
256                };
257                Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
258                    queue,
259                    group,
260                    consumer,
261                    count,
262                }))
263            }
264            Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
265                self.advance()?;
266                let queue = self.expect_ident()?;
267                self.expect(Token::Group)?;
268                let group = self.expect_ident()?;
269                Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
270                    queue,
271                    group,
272                }))
273            }
274            Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
275                self.advance()?;
276                let queue = self.expect_ident()?;
277                self.expect(Token::Group)?;
278                let group = self.expect_ident()?;
279                if !self.consume_ident_ci("CONSUMER")? {
280                    return Err(ParseError::expected(
281                        vec!["CONSUMER"],
282                        self.peek(),
283                        self.position(),
284                    ));
285                }
286                let consumer = self.expect_ident()?;
287                if !self.consume_ident_ci("MIN_IDLE")? {
288                    return Err(ParseError::expected(
289                        vec!["MIN_IDLE"],
290                        self.peek(),
291                        self.position(),
292                    ));
293                }
294                let min_idle_ms = self.parse_integer()?.max(0) as u64;
295                Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
296                    queue,
297                    group,
298                    consumer,
299                    min_idle_ms,
300                }))
301            }
302            Token::Ack => {
303                self.advance()?;
304                let queue = self.expect_ident()?;
305                self.expect(Token::Group)?;
306                let group = self.expect_ident()?;
307                let message_id = self.parse_string()?;
308                Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
309                    queue,
310                    group,
311                    message_id,
312                }))
313            }
314            Token::Nack => {
315                self.advance()?;
316                let queue = self.expect_ident()?;
317                self.expect(Token::Group)?;
318                let group = self.expect_ident()?;
319                let message_id = self.parse_string()?;
320                Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
321                    queue,
322                    group,
323                    message_id,
324                }))
325            }
326            _ => Err(ParseError::expected(
327                vec![
328                    "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
329                    "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
330                ],
331                self.peek(),
332                self.position(),
333            )),
334        }
335    }
336
337    fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
338        match self.peek() {
339            Token::Work => {
340                self.advance()?;
341                Ok(Some(QueueMode::Work))
342            }
343            Token::Ident(name) => {
344                if let Some(mode) = QueueMode::parse(name) {
345                    self.advance()?;
346                    Ok(Some(mode))
347                } else {
348                    Ok(None)
349                }
350            }
351            _ => Ok(None),
352        }
353    }
354
355    fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
356        match self.consume_queue_mode()? {
357            Some(mode) => Ok(mode),
358            None => Err(ParseError::expected(
359                vec!["FANOUT", "WORK"],
360                self.peek(),
361                self.position(),
362            )),
363        }
364    }
365
366    /// Parse duration unit for queue TTL
367    fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
368        match self.peek().clone() {
369            Token::Ident(ref unit) => {
370                let mult = match unit.to_ascii_lowercase().as_str() {
371                    "ms" => 1.0,
372                    "s" | "sec" | "secs" => 1_000.0,
373                    "m" | "min" | "mins" => 60_000.0,
374                    "h" | "hr" | "hrs" => 3_600_000.0,
375                    "d" | "day" | "days" => 86_400_000.0,
376                    _ => return Ok(1_000.0),
377                };
378                self.advance()?;
379                Ok(mult)
380            }
381            _ => Ok(1_000.0),
382        }
383    }
384}