reddb_server/storage/query/parser/
queue.rs1use super::super::ast::{
4 AlterQueueQuery, CreateQueueQuery, DropQueueQuery, QueryExpr, QueueCommand, QueueMode,
5 QueueSide, DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP, DEFAULT_QUEUE_LOCK_DEADLINE_MS,
6 DEFAULT_QUEUE_MAX_ATTEMPTS,
7};
8use super::super::lexer::Token;
9use super::error::ParseError;
10use super::Parser;
11
12impl<'a> Parser<'a> {
13 pub fn parse_create_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
15 let if_not_exists = self.match_if_not_exists()?;
16 let name = self.expect_ident()?;
17
18 let mut mode = QueueMode::Work;
19 let mut priority = false;
20 let mut max_size = None;
21 let mut ttl_ms = None;
22 let mut dlq = None;
23 let mut max_attempts = DEFAULT_QUEUE_MAX_ATTEMPTS;
24 let mut lock_deadline_ms = DEFAULT_QUEUE_LOCK_DEADLINE_MS;
25 let mut in_flight_cap_per_group = DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP;
26
27 loop {
29 if let Some(parsed_mode) = self.consume_queue_mode()? {
30 mode = parsed_mode;
31 } else if self.consume(&Token::Priority)? {
32 priority = true;
33 } else if self.consume_ident_ci("MAX_SIZE")? || self.consume_ident_ci("MAXSIZE")? {
34 max_size = Some(self.parse_positive_integer("MAX_SIZE")? as usize);
35 } else if self.consume_ident_ci("MAX_ATTEMPTS")?
36 || self.consume_ident_ci("MAXATTEMPTS")?
37 {
38 max_attempts = self.parse_integer()?.max(1) as u32;
39 } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
40 lock_deadline_ms = self.parse_integer()?.max(1) as u64;
41 } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
42 in_flight_cap_per_group = self.parse_integer()?.max(1) as u32;
43 } else if self.consume(&Token::With)? {
44 if self.consume_ident_ci("EVENTS")? {
45 return Err(ParseError::new(
46 "queues cannot have event subscriptions".to_string(),
47 self.position(),
48 ));
49 } else if self.consume_ident_ci("TTL")? {
50 let value = self.parse_float()?;
51 let unit = self.parse_queue_duration_unit()?;
52 ttl_ms = Some((value * unit) as u64);
53 } else if self.consume_ident_ci("DLQ")? {
54 dlq = Some(self.expect_ident()?);
55 }
56 } else {
57 break;
58 }
59 }
60
61 Ok(QueryExpr::CreateQueue(CreateQueueQuery {
62 name,
63 mode,
64 priority,
65 max_size,
66 ttl_ms,
67 dlq,
68 max_attempts,
69 lock_deadline_ms,
70 in_flight_cap_per_group,
71 if_not_exists,
72 }))
73 }
74
75 pub fn parse_alter_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
84 let name = self.expect_ident()?;
85 if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
86 return Err(ParseError::expected(
87 vec!["SET"],
88 self.peek(),
89 self.position(),
90 ));
91 }
92
93 let mut alter = AlterQueueQuery {
94 name,
95 ..Default::default()
96 };
97
98 if self.consume(&Token::Mode)? || self.consume_ident_ci("MODE")? {
99 alter.mode = Some(self.parse_queue_mode()?);
100 } else if self.consume_ident_ci("MAX_ATTEMPTS")? || self.consume_ident_ci("MAXATTEMPTS")? {
101 alter.max_attempts = Some(self.parse_integer()?.max(1) as u32);
102 } else if self.consume_ident_ci("LOCK_DEADLINE_MS")? {
103 alter.lock_deadline_ms = Some(self.parse_integer()?.max(1) as u64);
104 } else if self.consume_ident_ci("IN_FLIGHT_CAP_PER_GROUP")? {
105 alter.in_flight_cap_per_group = Some(self.parse_integer()?.max(1) as u32);
106 } else if self.consume_ident_ci("DLQ")? {
107 alter.dlq = Some(self.expect_ident()?);
108 } else {
109 return Err(ParseError::expected(
110 vec![
111 "MODE",
112 "MAX_ATTEMPTS",
113 "LOCK_DEADLINE_MS",
114 "IN_FLIGHT_CAP_PER_GROUP",
115 "DLQ",
116 ],
117 self.peek(),
118 self.position(),
119 ));
120 }
121
122 Ok(QueryExpr::AlterQueue(alter))
123 }
124
125 pub fn parse_drop_queue_body(&mut self) -> Result<QueryExpr, ParseError> {
127 let if_exists = self.match_if_exists()?;
128 let name = self.parse_drop_collection_name()?;
129 Ok(QueryExpr::DropQueue(DropQueueQuery { name, if_exists }))
130 }
131
132 pub fn parse_queue_command(&mut self) -> Result<QueryExpr, ParseError> {
134 self.expect(Token::Queue)?;
135
136 match self.peek().clone() {
137 Token::Push => {
138 self.advance()?;
139 let queue = self.expect_ident()?;
140 let value = self.parse_value()?;
141 let priority = if self.consume(&Token::Priority)? {
142 Some(self.parse_integer()? as i32)
143 } else {
144 None
145 };
146 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
147 queue,
148 value,
149 side: QueueSide::Right,
150 priority,
151 }))
152 }
153 Token::Pop => {
154 self.advance()?;
155 let queue = self.expect_ident()?;
156 let count = if self.consume(&Token::Count)? {
157 self.parse_integer()? as usize
158 } else {
159 1
160 };
161 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
162 queue,
163 side: QueueSide::Left,
164 count,
165 }))
166 }
167 Token::Peek => {
168 self.advance()?;
169 let queue = self.expect_ident()?;
170 let count =
175 if self.consume(&Token::Count)? || matches!(self.peek(), Token::Integer(_)) {
176 self.parse_integer()? as usize
177 } else {
178 1
179 };
180 Ok(QueryExpr::QueueCommand(QueueCommand::Peek { queue, count }))
181 }
182 Token::Ident(ref name) if name.eq_ignore_ascii_case("LEN") => {
183 self.advance()?;
184 let queue = self.expect_ident()?;
185 Ok(QueryExpr::QueueCommand(QueueCommand::Len { queue }))
186 }
187 Token::Ident(ref name) if name.eq_ignore_ascii_case("MOVE") => {
188 self.advance()?;
189 self.expect(Token::From)?;
190 let source = self.expect_ident()?;
191 self.expect(Token::To)?;
192 let destination = self.expect_ident()?;
193 let filter = if self.consume(&Token::Where)? {
194 Some(self.parse_filter()?)
195 } else {
196 None
197 };
198 let limit = if self.consume(&Token::Limit)? {
199 self.parse_positive_integer("LIMIT")? as usize
200 } else if filter.is_some() {
201 return Err(ParseError::expected(
202 vec!["LIMIT"],
203 self.peek(),
204 self.position(),
205 ));
206 } else {
207 1
208 };
209 Ok(QueryExpr::QueueCommand(QueueCommand::Move {
210 source,
211 destination,
212 filter,
213 limit,
214 }))
215 }
216 Token::Purge => {
217 self.advance()?;
218 let queue = self.expect_ident()?;
219 Ok(QueryExpr::QueueCommand(QueueCommand::Purge { queue }))
220 }
221 Token::Ident(ref name) if name.eq_ignore_ascii_case("LPOP") => {
222 self.advance()?;
223 let queue = self.expect_ident()?;
224 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
225 queue,
226 side: QueueSide::Left,
227 count: 1,
228 }))
229 }
230 Token::Ident(ref name) if name.eq_ignore_ascii_case("RPOP") => {
231 self.advance()?;
232 let queue = self.expect_ident()?;
233 Ok(QueryExpr::QueueCommand(QueueCommand::Pop {
234 queue,
235 side: QueueSide::Right,
236 count: 1,
237 }))
238 }
239 Token::Ident(ref name) if name.eq_ignore_ascii_case("LPUSH") => {
240 self.advance()?;
241 let queue = self.expect_ident()?;
242 let value = self.parse_value()?;
243 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
244 queue,
245 value,
246 side: QueueSide::Left,
247 priority: None,
248 }))
249 }
250 Token::Ident(ref name) if name.eq_ignore_ascii_case("RPUSH") => {
251 self.advance()?;
252 let queue = self.expect_ident()?;
253 let value = self.parse_value()?;
254 let priority = if self.consume(&Token::Priority)? {
255 Some(self.parse_integer()? as i32)
256 } else {
257 None
258 };
259 Ok(QueryExpr::QueueCommand(QueueCommand::Push {
260 queue,
261 value,
262 side: QueueSide::Right,
263 priority,
264 }))
265 }
266 Token::Group => {
267 self.advance()?;
268 self.expect(Token::Create)?;
269 let queue = self.expect_ident()?;
270 let group = self.expect_ident()?;
271 Ok(QueryExpr::QueueCommand(QueueCommand::GroupCreate {
272 queue,
273 group,
274 }))
275 }
276 Token::Ident(ref name) if name.eq_ignore_ascii_case("READ") => {
277 self.advance()?;
278 let queue = self.expect_ident()?;
279 let group = if self.consume(&Token::Group)? {
280 Some(self.expect_ident()?)
281 } else {
282 None
283 };
284 if !self.consume_ident_ci("CONSUMER")? {
286 return Err(ParseError::expected(
287 vec!["CONSUMER"],
288 self.peek(),
289 self.position(),
290 ));
291 }
292 let consumer = self.expect_ident()?;
293 let count = if self.consume(&Token::Count)? {
294 self.parse_integer()? as usize
295 } else {
296 1
297 };
298 Ok(QueryExpr::QueueCommand(QueueCommand::GroupRead {
299 queue,
300 group,
301 consumer,
302 count,
303 }))
304 }
305 Token::Ident(ref name) if name.eq_ignore_ascii_case("PENDING") => {
306 self.advance()?;
307 let queue = self.expect_ident()?;
308 self.expect(Token::Group)?;
309 let group = self.expect_ident()?;
310 Ok(QueryExpr::QueueCommand(QueueCommand::Pending {
311 queue,
312 group,
313 }))
314 }
315 Token::Ident(ref name) if name.eq_ignore_ascii_case("CLAIM") => {
316 self.advance()?;
317 let queue = self.expect_ident()?;
318 self.expect(Token::Group)?;
319 let group = self.expect_ident()?;
320 if !self.consume_ident_ci("CONSUMER")? {
321 return Err(ParseError::expected(
322 vec!["CONSUMER"],
323 self.peek(),
324 self.position(),
325 ));
326 }
327 let consumer = self.expect_ident()?;
328 if !self.consume_ident_ci("MIN_IDLE")? {
329 return Err(ParseError::expected(
330 vec!["MIN_IDLE"],
331 self.peek(),
332 self.position(),
333 ));
334 }
335 let min_idle_ms = self.parse_integer()?.max(0) as u64;
336 Ok(QueryExpr::QueueCommand(QueueCommand::Claim {
337 queue,
338 group,
339 consumer,
340 min_idle_ms,
341 }))
342 }
343 Token::Ack => {
344 self.advance()?;
345 let queue = self.expect_ident()?;
346 let (group, message_id, delivery_id) = self.parse_ack_nack_handle()?;
347 Ok(QueryExpr::QueueCommand(QueueCommand::Ack {
348 queue,
349 group,
350 message_id,
351 delivery_id,
352 }))
353 }
354 Token::Nack => {
355 self.advance()?;
356 let queue = self.expect_ident()?;
357 let (group, message_id, delivery_id) = self.parse_ack_nack_handle()?;
358 Ok(QueryExpr::QueueCommand(QueueCommand::Nack {
359 queue,
360 group,
361 message_id,
362 delivery_id,
363 }))
364 }
365 _ => Err(ParseError::expected(
366 vec![
367 "PUSH", "POP", "PEEK", "LEN", "PURGE", "GROUP", "READ", "ACK", "NACK", "LPUSH",
368 "RPUSH", "LPOP", "RPOP", "PENDING", "CLAIM", "MOVE",
369 ],
370 self.peek(),
371 self.position(),
372 )),
373 }
374 }
375
376 fn parse_ack_nack_handle(&mut self) -> Result<(String, String, Option<String>), ParseError> {
385 let (group, message_id) = if matches!(self.peek(), Token::Group) {
386 self.advance()?;
387 let group = self.expect_ident()?;
388 let message_id = self.parse_string()?;
389 (group, message_id)
390 } else {
391 (String::new(), String::new())
392 };
393 let delivery_id = if self.consume(&Token::With)? {
394 if !self.consume_ident_ci("delivery_id")? {
395 return Err(ParseError::expected(
396 vec!["delivery_id"],
397 self.peek(),
398 self.position(),
399 ));
400 }
401 if !self.consume(&Token::Eq)? {
402 return Err(ParseError::expected(
403 vec!["="],
404 self.peek(),
405 self.position(),
406 ));
407 }
408 Some(self.parse_string()?)
409 } else {
410 None
411 };
412 if group.is_empty() && delivery_id.is_none() {
413 return Err(ParseError::expected(
414 vec!["GROUP", "WITH delivery_id"],
415 self.peek(),
416 self.position(),
417 ));
418 }
419 Ok((group, message_id, delivery_id))
420 }
421
422 fn consume_queue_mode(&mut self) -> Result<Option<QueueMode>, ParseError> {
423 match self.peek() {
424 Token::Work => {
425 self.advance()?;
426 Ok(Some(QueueMode::Work))
427 }
428 Token::Ident(name) => {
429 if let Some(mode) = QueueMode::parse(name) {
430 self.advance()?;
431 Ok(Some(mode))
432 } else {
433 Ok(None)
434 }
435 }
436 _ => Ok(None),
437 }
438 }
439
440 fn parse_queue_mode(&mut self) -> Result<QueueMode, ParseError> {
441 match self.consume_queue_mode()? {
442 Some(mode) => Ok(mode),
443 None => Err(ParseError::expected(
444 vec!["FANOUT", "WORK"],
445 self.peek(),
446 self.position(),
447 )),
448 }
449 }
450
451 fn parse_queue_duration_unit(&mut self) -> Result<f64, ParseError> {
453 match self.peek().clone() {
454 Token::Ident(ref unit) => {
455 let mult = match unit.to_ascii_lowercase().as_str() {
456 "ms" => 1.0,
457 "s" | "sec" | "secs" => 1_000.0,
458 "m" | "min" | "mins" => 60_000.0,
459 "h" | "hr" | "hrs" => 3_600_000.0,
460 "d" | "day" | "days" => 86_400_000.0,
461 _ => return Ok(1_000.0),
462 };
463 self.advance()?;
464 Ok(mult)
465 }
466 _ => Ok(1_000.0),
467 }
468 }
469}