Skip to main content

reddb_server/storage/query/parser/
queue.rs

1//! Parser for QUEUE commands and CREATE/DROP QUEUE
2
3use super::super::ast::{
4    AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueCommand, QueueMode,
5    QueueSide,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10
11impl<'a> Parser<'a> {
12    /// Parse CREATE QUEUE body (after CREATE QUEUE consumed)
13    pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
14        let if_not_exists = self.match_if_not_exists()?;
15        let name = self.expect_ident()?;
16
17        let mut mode = QueueMode::Work;
18        let mut priority = false;
19        let mut max_size = None;
20        let mut ttl_ms = None;
21        let mut dlq = None;
22        let mut max_attempts = 3u32;
23
24        // Parse optional clauses in any order
25        loop {
26            if let Some(parsed_mode) = self.consume_queue_mode()? {
27                mode = parsed_mode;
28            } else if self.consume(&Token::Priority)? {
29                priority = true;
30            } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
31                max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
32            } else if self.consume_ident_ci("MAX_ATTEMPTS")?
33                || self.consume_ident_ci("MAXATTEMPTS")?
34            {
35                max_attempts = self.parse_integer()?.max(1) as u32;
36            } else if self.consume(&Token::With)? {
37                if self.consume_ident_ci("EVENTS")? {
38                    return Err(ParseError::new(
39                        "queues cannot have event subscriptions".to_string(),
40                        self.position(),
41                    ));
42                } else if self.consume_ident_ci("TTL")? {
43                    let value = self.parse_float()?;
44                    let unit = self.parse_queue_duration_unit()?;
45                    ttl_ms = Some((value * unit) as u64);
46                } else if self.consume_ident_ci("DLQ")? {
47                    dlq = Some(self.expect_ident()?);
48                }
49            } else {
50                break;
51            }
52        }
53
54        Ok(QueryExpr::CreateQueue(CreateQueueQuery {
55            name,
56            mode,
57            priority,
58            max_size,
59            ttl_ms,
60            dlq,
61            max_attempts,
62            if_not_exists,
63        }))
64    }
65
66    /// Parse ALTER QUEUE body (after ALTER QUEUE consumed)
67    pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
68        let name = self.expect_ident()?;
69        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
70            return Err(ParseError::expected(
71                vec!["SET"],
72                self.peek(),
73                self.position(),
74            ));
75        }
76        if !self.consume(&Token::Mode)? && !self.consume_ident_ci("MODE")? {
77            return Err(ParseError::expected(
78                vec!["MODE"],
79                self.peek(),
80                self.position(),
81            ));
82        }
83        let mode = self.parse_queue_mode()?;
84        Ok(QueryExpr::AlterQueue(AlterQueueQuery { name, mode }))
85    }
86
87    /// Parse DROP QUEUE body (after DROP QUEUE consumed)
88    pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
89        let if_exists = self.match_if_exists()?;
90        let name = self.parse_drop_collection_name()?;
91        Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
92    }
93
94    /// Parse QUEUE subcommand (after QUEUE token consumed)
95    pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
96        self.expect(Token::Queue)?;
97
98        match self.peek().clone() {
99            Token::Push => {
100                self.advance()?;
101                let queue = self.expect_ident()?;
102                let value = self.parse_value()?;
103                let priority = if self.consume(&Token::Priority)? {
104                    Some(self.parse_integer()? as i32)
105                } else {
106                    None
107                };
108                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
109                    queue,
110                    value,
111                    side: QueueSide::Right,
112                    priority,
113                }))
114            }
115            Token::Pop => {
116                self.advance()?;
117                let queue = self.expect_ident()?;
118                let count = if self.consume(&Token::Count)? {
119                    self.parse_integer()? as usize
120                } else {
121                    1
122                };
123                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
124                    queue,
125                    side: QueueSide::Left,
126                    count,
127                }))
128            }
129            Token::Peek => {
130                self.advance()?;
131                let queue = self.expect_ident()?;
132                let count = if matches!(self.peek(), Token::Integer(_)) {
133                    self.parse_integer()? as usize
134                } else {
135                    1
136                };
137                Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
138            }
139            Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
140                self.advance()?;
141                let queue = self.expect_ident()?;
142                Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
143            }
144            Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
145                self.advance()?;
146                self.expect(Token::From)?;
147                let source = self.expect_ident()?;
148                self.expect(Token::To)?;
149                let destination = self.expect_ident()?;
150                let filter = if self.consume(&Token::Where)? {
151                    Some(self.parse_filter()?)
152                } else {
153                    None
154                };
155                let limit = if self.consume(&Token::Limit)? {
156                    self.parse_positive_integer("LIMIT")? as usize
157                } else if filter.is_some() {
158                    return Err(ParseError::expected(
159                        vec!["LIMIT"],
160                        self.peek(),
161                        self.position(),
162                    ));
163                } else {
164                    1
165                };
166                Ok(QueryExpr::QueueCommand(QueueCommand::Move {
167                    source,
168                    destination,
169                    filter,
170                    limit,
171                }))
172            }
173            Token::Purge => {
174                self.advance()?;
175                let queue = self.expect_ident()?;
176                Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
177            }
178            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
179                self.advance()?;
180                let queue = self.expect_ident()?;
181                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
182                    queue,
183                    side: QueueSide::Left,
184                    count: 1,
185                }))
186            }
187            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
188                self.advance()?;
189                let queue = self.expect_ident()?;
190                Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
191                    queue,
192                    side: QueueSide::Right,
193                    count: 1,
194                }))
195            }
196            Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
197                self.advance()?;
198                let queue = self.expect_ident()?;
199                let value = self.parse_value()?;
200                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
201                    queue,
202                    value,
203                    side: QueueSide::Left,
204                    priority: None,
205                }))
206            }
207            Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
208                self.advance()?;
209                let queue = self.expect_ident()?;
210                let value = self.parse_value()?;
211                let priority = if self.consume(&Token::Priority)? {
212                    Some(self.parse_integer()? as i32)
213                } else {
214                    None
215                };
216                Ok(QueryExpr::QueueCommand(QueueCommand::Push {
217                    queue,
218                    value,
219                    side: QueueSide::Right,
220                    priority,
221                }))
222            }
223            Token::Group => {
224                self.advance()?;
225                self.expect(Token::Create)?;
226                let queue = self.expect_ident()?;
227                let group = self.expect_ident()?;
228                Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
229                    queue,
230                    group,
231                }))
232            }
233            Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
234                self.advance()?;
235                let queue = self.expect_ident()?;
236                let group = if self.consume(&Token::Group)? {
237                    Some(self.expect_ident()?)
238                } else {
239                    None
240                };
241                // CONSUMER consumer_name
242                if !self.consume_ident_ci("CONSUMER")? {
243                    return Err(ParseError::expected(
244                        vec!["CONSUMER"],
245                        self.peek(),
246                        self.position(),
247                    ));
248                }
249                let consumer = self.expect_ident()?;
250                let count = if self.consume(&Token::Count)? {
251                    self.parse_integer()? as usize
252                } else {
253                    1
254                };
255                Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
256                    queue,
257                    group,
258                    consumer,
259                    count,
260                }))
261            }
262            Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
263                self.advance()?;
264                let queue = self.expect_ident()?;
265                self.expect(Token::Group)?;
266                let group = self.expect_ident()?;
267                Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
268                    queue,
269                    group,
270                }))
271            }
272            Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
273                self.advance()?;
274                let queue = self.expect_ident()?;
275                self.expect(Token::Group)?;
276                let group = self.expect_ident()?;
277                if !self.consume_ident_ci("CONSUMER")? {
278                    return Err(ParseError::expected(
279                        vec!["CONSUMER"],
280                        self.peek(),
281                        self.position(),
282                    ));
283                }
284                let consumer = self.expect_ident()?;
285                if !self.consume_ident_ci("MIN_IDLE")? {
286                    return Err(ParseError::expected(
287                        vec!["MIN_IDLE"],
288                        self.peek(),
289                        self.position(),
290                    ));
291                }
292                let min_idle_ms = self.parse_integer()?.max(0) as u64;
293                Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
294                    queue,
295                    group,
296                    consumer,
297                    min_idle_ms,
298                }))
299            }
300            Token::Ack => {
301                self.advance()?;
302                let queue = self.expect_ident()?;
303                self.expect(Token::Group)?;
304                let group = self.expect_ident()?;
305                let message_id = self.parse_string()?;
306                Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
307                    queue,
308                    group,
309                    message_id,
310                }))
311            }
312            Token::Nack => {
313                self.advance()?;
314                let queue = self.expect_ident()?;
315                self.expect(Token::Group)?;
316                let group = self.expect_ident()?;
317                let message_id = self.parse_string()?;
318                Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
319                    queue,
320                    group,
321                    message_id,
322                }))
323            }
324            _ => Err(ParseError::expected(
325                vec![
326                    "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
327                    "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
328                ],
329                self.peek(),
330                self.position(),
331            )),
332        }
333    }
334
335    fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
336        match self.peek() {
337            Token::Work => {
338                self.advance()?;
339                Ok(Some(QueueMode::Work))
340            }
341            Token::Ident(name) => {
342                if let Some(mode) = QueueMode::parse(name) {
343                    self.advance()?;
344                    Ok(Some(mode))
345                } else {
346                    Ok(None)
347                }
348            }
349            _ => Ok(None),
350        }
351    }
352
353    fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
354        match self.consume_queue_mode()? {
355            Some(mode) => Ok(mode),
356            None => Err(ParseError::expected(
357                vec!["FANOUT", "WORK"],
358                self.peek(),
359                self.position(),
360            )),
361        }
362    }
363
364    /// Parse duration unit for queue TTL
365    fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
366        match self.peek().clone() {
367            Token::Ident(ref unit) => {
368                let mult = match unit.to_ascii_lowercase().as_str() {
369                    "ms" => 1.0,
370                    "s" | "sec" | "secs" => 1_000.0,
371                    "m" | "min" | "mins" => 60_000.0,
372                    "h" | "hr" | "hrs" => 3_600_000.0,
373                    "d" | "day" | "days" => 86_400_000.0,
374                    _ => return Ok(1_000.0),
375                };
376                self.advance()?;
377                Ok(mult)
378            }
379            _ => Ok(1_000.0),
380        }
381    }
382}