reddb_server/storage/query/parser/
queue.rs1use super::super::ast::{
4 AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueCommand, QueueMode,
5 QueueSide,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10
11impl<'a> Parser<'a> {
12 pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
14 let if_not_exists = self.match_if_not_exists()?;
15 let name = self.expect_ident()?;
16
17 let mut mode = QueueMode::Work;
18 let mut priority = false;
19 let mut max_size = None;
20 let mut ttl_ms = None;
21 let mut dlq = None;
22 let mut max_attempts = 3u32;
23
24 loop {
26 if let Some(parsed_mode) = self.consume_queue_mode()? {
27 mode = parsed_mode;
28 } else if self.consume(&Token::Priority)? {
29 priority = true;
30 } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
31 max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
32 } else if self.consume_ident_ci("MAX_ATTEMPTS")?
33 || self.consume_ident_ci("MAXATTEMPTS")?
34 {
35 max_attempts = self.parse_integer()?.max(1) as u32;
36 } else if self.consume(&Token::With)? {
37 if self.consume_ident_ci("EVENTS")? {
38 return Err(ParseError::new(
39 "queues cannot have event subscriptions".to_string(),
40 self.position(),
41 ));
42 } else if self.consume_ident_ci("TTL")? {
43 let value = self.parse_float()?;
44 let unit = self.parse_queue_duration_unit()?;
45 ttl_ms = Some((value * unit) as u64);
46 } else if self.consume_ident_ci("DLQ")? {
47 dlq = Some(self.expect_ident()?);
48 }
49 } else {
50 break;
51 }
52 }
53
54 Ok(QueryExpr::CreateQueue(CreateQueueQuery {
55 name,
56 mode,
57 priority,
58 max_size,
59 ttl_ms,
60 dlq,
61 max_attempts,
62 if_not_exists,
63 }))
64 }
65
66 pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
68 let name = self.expect_ident()?;
69 if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
70 return Err(ParseError::expected(
71 vec!["SET"],
72 self.peek(),
73 self.position(),
74 ));
75 }
76 if !self.consume(&Token::Mode)? && !self.consume_ident_ci("MODE")? {
77 return Err(ParseError::expected(
78 vec!["MODE"],
79 self.peek(),
80 self.position(),
81 ));
82 }
83 let mode = self.parse_queue_mode()?;
84 Ok(QueryExpr::AlterQueue(AlterQueueQuery { name, mode }))
85 }
86
87 pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
89 let if_exists = self.match_if_exists()?;
90 let name = self.parse_drop_collection_name()?;
91 Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
92 }
93
94 pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
96 self.expect(Token::Queue)?;
97
98 match self.peek().clone() {
99 Token::Push => {
100 self.advance()?;
101 let queue = self.expect_ident()?;
102 let value = self.parse_value()?;
103 let priority = if self.consume(&Token::Priority)? {
104 Some(self.parse_integer()? as i32)
105 } else {
106 None
107 };
108 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
109 queue,
110 value,
111 side: QueueSide::Right,
112 priority,
113 }))
114 }
115 Token::Pop => {
116 self.advance()?;
117 let queue = self.expect_ident()?;
118 let count = if self.consume(&Token::Count)? {
119 self.parse_integer()? as usize
120 } else {
121 1
122 };
123 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
124 queue,
125 side: QueueSide::Left,
126 count,
127 }))
128 }
129 Token::Peek => {
130 self.advance()?;
131 let queue = self.expect_ident()?;
132 let count = if self.consume(&Token::Count)? {
133 self.parse_integer()? as usize
134 } else if matches!(self.peek(), Token::Integer(_)) {
135 self.parse_integer()? as usize
136 } else {
137 1
138 };
139 Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
140 }
141 Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
142 self.advance()?;
143 let queue = self.expect_ident()?;
144 Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
145 }
146 Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
147 self.advance()?;
148 self.expect(Token::From)?;
149 let source = self.expect_ident()?;
150 self.expect(Token::To)?;
151 let destination = self.expect_ident()?;
152 let filter = if self.consume(&Token::Where)? {
153 Some(self.parse_filter()?)
154 } else {
155 None
156 };
157 let limit = if self.consume(&Token::Limit)? {
158 self.parse_positive_integer("LIMIT")? as usize
159 } else if filter.is_some() {
160 return Err(ParseError::expected(
161 vec!["LIMIT"],
162 self.peek(),
163 self.position(),
164 ));
165 } else {
166 1
167 };
168 Ok(QueryExpr::QueueCommand(QueueCommand::Move {
169 source,
170 destination,
171 filter,
172 limit,
173 }))
174 }
175 Token::Purge => {
176 self.advance()?;
177 let queue = self.expect_ident()?;
178 Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
179 }
180 Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
181 self.advance()?;
182 let queue = self.expect_ident()?;
183 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
184 queue,
185 side: QueueSide::Left,
186 count: 1,
187 }))
188 }
189 Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
190 self.advance()?;
191 let queue = self.expect_ident()?;
192 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
193 queue,
194 side: QueueSide::Right,
195 count: 1,
196 }))
197 }
198 Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
199 self.advance()?;
200 let queue = self.expect_ident()?;
201 let value = self.parse_value()?;
202 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
203 queue,
204 value,
205 side: QueueSide::Left,
206 priority: None,
207 }))
208 }
209 Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
210 self.advance()?;
211 let queue = self.expect_ident()?;
212 let value = self.parse_value()?;
213 let priority = if self.consume(&Token::Priority)? {
214 Some(self.parse_integer()? as i32)
215 } else {
216 None
217 };
218 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
219 queue,
220 value,
221 side: QueueSide::Right,
222 priority,
223 }))
224 }
225 Token::Group => {
226 self.advance()?;
227 self.expect(Token::Create)?;
228 let queue = self.expect_ident()?;
229 let group = self.expect_ident()?;
230 Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
231 queue,
232 group,
233 }))
234 }
235 Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
236 self.advance()?;
237 let queue = self.expect_ident()?;
238 let group = if self.consume(&Token::Group)? {
239 Some(self.expect_ident()?)
240 } else {
241 None
242 };
243 if !self.consume_ident_ci("CONSUMER")? {
245 return Err(ParseError::expected(
246 vec!["CONSUMER"],
247 self.peek(),
248 self.position(),
249 ));
250 }
251 let consumer = self.expect_ident()?;
252 let count = if self.consume(&Token::Count)? {
253 self.parse_integer()? as usize
254 } else {
255 1
256 };
257 Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
258 queue,
259 group,
260 consumer,
261 count,
262 }))
263 }
264 Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
265 self.advance()?;
266 let queue = self.expect_ident()?;
267 self.expect(Token::Group)?;
268 let group = self.expect_ident()?;
269 Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
270 queue,
271 group,
272 }))
273 }
274 Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
275 self.advance()?;
276 let queue = self.expect_ident()?;
277 self.expect(Token::Group)?;
278 let group = self.expect_ident()?;
279 if !self.consume_ident_ci("CONSUMER")? {
280 return Err(ParseError::expected(
281 vec!["CONSUMER"],
282 self.peek(),
283 self.position(),
284 ));
285 }
286 let consumer = self.expect_ident()?;
287 if !self.consume_ident_ci("MIN_IDLE")? {
288 return Err(ParseError::expected(
289 vec!["MIN_IDLE"],
290 self.peek(),
291 self.position(),
292 ));
293 }
294 let min_idle_ms = self.parse_integer()?.max(0) as u64;
295 Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
296 queue,
297 group,
298 consumer,
299 min_idle_ms,
300 }))
301 }
302 Token::Ack => {
303 self.advance()?;
304 let queue = self.expect_ident()?;
305 self.expect(Token::Group)?;
306 let group = self.expect_ident()?;
307 let message_id = self.parse_string()?;
308 Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
309 queue,
310 group,
311 message_id,
312 }))
313 }
314 Token::Nack => {
315 self.advance()?;
316 let queue = self.expect_ident()?;
317 self.expect(Token::Group)?;
318 let group = self.expect_ident()?;
319 let message_id = self.parse_string()?;
320 Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
321 queue,
322 group,
323 message_id,
324 }))
325 }
326 _ => Err(ParseError::expected(
327 vec![
328 "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
329 "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
330 ],
331 self.peek(),
332 self.position(),
333 )),
334 }
335 }
336
337 fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
338 match self.peek() {
339 Token::Work => {
340 self.advance()?;
341 Ok(Some(QueueMode::Work))
342 }
343 Token::Ident(name) => {
344 if let Some(mode) = QueueMode::parse(name) {
345 self.advance()?;
346 Ok(Some(mode))
347 } else {
348 Ok(None)
349 }
350 }
351 _ => Ok(None),
352 }
353 }
354
355 fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
356 match self.consume_queue_mode()? {
357 Some(mode) => Ok(mode),
358 None => Err(ParseError::expected(
359 vec!["FANOUT", "WORK"],
360 self.peek(),
361 self.position(),
362 )),
363 }
364 }
365
366 fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
368 match self.peek().clone() {
369 Token::Ident(ref unit) => {
370 let mult = match unit.to_ascii_lowercase().as_str() {
371 "ms" => 1.0,
372 "s" | "sec" | "secs" => 1_000.0,
373 "m" | "min" | "mins" => 60_000.0,
374 "h" | "hr" | "hrs" => 3_600_000.0,
375 "d" | "day" | "days" => 86_400_000.0,
376 _ => return Ok(1_000.0),
377 };
378 self.advance()?;
379 Ok(mult)
380 }
381 _ => Ok(1_000.0),
382 }
383 }
384}